Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as public by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 Union,
46)
48if TYPE_CHECKING:
49 # There are no circular imports here, but we try to defer imports as long
50 # as possible to reduce start-up time for anything that doesn't need
51 # these imports.
52 from .attrs import GitAttributes
53 from .config import ConditionMatcher, ConfigFile, StackedConfig
54 from .index import Index
55 from .notes import Notes
57from .errors import (
58 CommitError,
59 HookError,
60 NoIndexPresent,
61 NotBlobError,
62 NotCommitError,
63 NotGitRepository,
64 NotTagError,
65 NotTreeError,
66 RefFormatError,
67)
68from .file import GitFile
69from .hooks import (
70 CommitMsgShellHook,
71 Hook,
72 PostCommitShellHook,
73 PostReceiveShellHook,
74 PreCommitShellHook,
75)
76from .object_store import (
77 DiskObjectStore,
78 MemoryObjectStore,
79 MissingObjectFinder,
80 ObjectStoreGraphWalker,
81 PackBasedObjectStore,
82 find_shallow,
83 peel_sha,
84)
85from .objects import (
86 Blob,
87 Commit,
88 ObjectID,
89 ShaFile,
90 Tag,
91 Tree,
92 check_hexsha,
93 valid_hexsha,
94)
95from .pack import generate_unpacked_objects
96from .refs import (
97 ANNOTATED_TAG_SUFFIX, # noqa: F401
98 LOCAL_BRANCH_PREFIX,
99 LOCAL_TAG_PREFIX, # noqa: F401
100 SYMREF, # noqa: F401
101 DictRefsContainer,
102 DiskRefsContainer,
103 InfoRefsContainer, # noqa: F401
104 Ref,
105 RefsContainer,
106 _set_default_branch,
107 _set_head,
108 _set_origin_head,
109 check_ref_format, # noqa: F401
110 read_packed_refs, # noqa: F401
111 read_packed_refs_with_peeled, # noqa: F401
112 serialize_refs,
113 write_packed_refs, # noqa: F401
114)
116CONTROLDIR = ".git"
117OBJECTDIR = "objects"
118REFSDIR = "refs"
119REFSDIR_TAGS = "tags"
120REFSDIR_HEADS = "heads"
121INDEX_FILENAME = "index"
122COMMONDIR = "commondir"
123GITDIR = "gitdir"
124WORKTREES = "worktrees"
126BASE_DIRECTORIES = [
127 ["branches"],
128 [REFSDIR],
129 [REFSDIR, REFSDIR_TAGS],
130 [REFSDIR, REFSDIR_HEADS],
131 ["hooks"],
132 ["info"],
133]
135DEFAULT_BRANCH = b"master"
138class InvalidUserIdentity(Exception):
139 """User identity is not of the format 'user <email>'."""
141 def __init__(self, identity) -> None:
142 self.identity = identity
145class DefaultIdentityNotFound(Exception):
146 """Default identity could not be determined."""
149# TODO(jelmer): Cache?
150def _get_default_identity() -> tuple[str, str]:
151 import socket
153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
154 username = os.environ.get(name)
155 if username:
156 break
157 else:
158 username = None
160 try:
161 import pwd
162 except ImportError:
163 fullname = None
164 else:
165 try:
166 entry = pwd.getpwuid(os.getuid()) # type: ignore
167 except KeyError:
168 fullname = None
169 else:
170 if getattr(entry, "gecos", None):
171 fullname = entry.pw_gecos.split(",")[0]
172 else:
173 fullname = None
174 if username is None:
175 username = entry.pw_name
176 if not fullname:
177 if username is None:
178 raise DefaultIdentityNotFound("no username found")
179 fullname = username
180 email = os.environ.get("EMAIL")
181 if email is None:
182 if username is None:
183 raise DefaultIdentityNotFound("no username found")
184 email = f"{username}@{socket.gethostname()}"
185 return (fullname, email)
188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
189 """Determine the identity to use for new commits.
191 If kind is set, this first checks
192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
194 If those variables are not set, then it will fall back
195 to reading the user.name and user.email settings from
196 the specified configuration.
198 If that also fails, then it will fall back to using
199 the current users' identity as obtained from the host
200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
202 Args:
203 kind: Optional kind to return identity for,
204 usually either "AUTHOR" or "COMMITTER".
206 Returns:
207 A user identity
208 """
209 user: Optional[bytes] = None
210 email: Optional[bytes] = None
211 if kind:
212 user_uc = os.environ.get("GIT_" + kind + "_NAME")
213 if user_uc is not None:
214 user = user_uc.encode("utf-8")
215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
216 if email_uc is not None:
217 email = email_uc.encode("utf-8")
218 if user is None:
219 try:
220 user = config.get(("user",), "name")
221 except KeyError:
222 user = None
223 if email is None:
224 try:
225 email = config.get(("user",), "email")
226 except KeyError:
227 email = None
228 default_user, default_email = _get_default_identity()
229 if user is None:
230 user = default_user.encode("utf-8")
231 if email is None:
232 email = default_email.encode("utf-8")
233 if email.startswith(b"<") and email.endswith(b">"):
234 email = email[1:-1]
235 return user + b" <" + email + b">"
238def check_user_identity(identity) -> None:
239 """Verify that a user identity is formatted correctly.
241 Args:
242 identity: User identity bytestring
243 Raises:
244 InvalidUserIdentity: Raised when identity is invalid
245 """
246 try:
247 fst, snd = identity.split(b" <", 1)
248 except ValueError as exc:
249 raise InvalidUserIdentity(identity) from exc
250 if b">" not in snd:
251 raise InvalidUserIdentity(identity)
252 if b"\0" in identity or b"\n" in identity:
253 raise InvalidUserIdentity(identity)
256def parse_graftpoints(
257 graftpoints: Iterable[bytes],
258) -> dict[bytes, list[bytes]]:
259 """Convert a list of graftpoints into a dict.
261 Args:
262 graftpoints: Iterator of graftpoint lines
264 Each line is formatted as:
265 <commit sha1> <parent sha1> [<parent sha1>]*
267 Resulting dictionary is:
268 <commit sha1>: [<parent sha1>*]
270 https://git.wiki.kernel.org/index.php/GraftPoint
271 """
272 grafts = {}
273 for line in graftpoints:
274 raw_graft = line.split(None, 1)
276 commit = raw_graft[0]
277 if len(raw_graft) == 2:
278 parents = raw_graft[1].split()
279 else:
280 parents = []
282 for sha in [commit, *parents]:
283 check_hexsha(sha, "Invalid graftpoint")
285 grafts[commit] = parents
286 return grafts
289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
290 """Convert a dictionary of grafts into string.
292 The graft dictionary is:
293 <commit sha1>: [<parent sha1>*]
295 Each line is formatted as:
296 <commit sha1> <parent sha1> [<parent sha1>]*
298 https://git.wiki.kernel.org/index.php/GraftPoint
300 """
301 graft_lines = []
302 for commit, parents in graftpoints.items():
303 if parents:
304 graft_lines.append(commit + b" " + b" ".join(parents))
305 else:
306 graft_lines.append(commit)
307 return b"\n".join(graft_lines)
310def _set_filesystem_hidden(path) -> None:
311 """Mark path as to be hidden if supported by platform and filesystem.
313 On win32 uses SetFileAttributesW api:
314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
315 """
316 if sys.platform == "win32":
317 import ctypes
318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
320 FILE_ATTRIBUTE_HIDDEN = 2
321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
322 ("SetFileAttributesW", ctypes.windll.kernel32)
323 )
325 if isinstance(path, bytes):
326 path = os.fsdecode(path)
327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
328 pass # Could raise or log `ctypes.WinError()` here
330 # Could implement other platform specific filesystem hiding here
333class ParentsProvider:
334 def __init__(self, store, grafts={}, shallows=[]) -> None:
335 self.store = store
336 self.grafts = grafts
337 self.shallows = set(shallows)
339 # Get commit graph once at initialization for performance
340 self.commit_graph = store.get_commit_graph()
342 def get_parents(self, commit_id, commit=None):
343 try:
344 return self.grafts[commit_id]
345 except KeyError:
346 pass
347 if commit_id in self.shallows:
348 return []
350 # Try to use commit graph for faster parent lookup
351 if self.commit_graph:
352 parents = self.commit_graph.get_parents(commit_id)
353 if parents is not None:
354 return parents
356 # Fallback to reading the commit object
357 if commit is None:
358 commit = self.store[commit_id]
359 return commit.parents
362class BaseRepo:
363 """Base class for a git repository.
365 This base class is meant to be used for Repository implementations that e.g.
366 work on top of a different transport than a standard filesystem path.
368 Attributes:
369 object_store: Dictionary-like object for accessing
370 the objects
371 refs: Dictionary-like object with the refs in this
372 repository
373 """
375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
376 """Open a repository.
378 This shouldn't be called directly, but rather through one of the
379 base classes, such as MemoryRepo or Repo.
381 Args:
382 object_store: Object store to use
383 refs: Refs container to use
384 """
385 self.object_store = object_store
386 self.refs = refs
388 self._graftpoints: dict[bytes, list[bytes]] = {}
389 self.hooks: dict[str, Hook] = {}
391 def _determine_file_mode(self) -> bool:
392 """Probe the file-system to determine whether permissions can be trusted.
394 Returns: True if permissions can be trusted, False otherwise.
395 """
396 raise NotImplementedError(self._determine_file_mode)
398 def _determine_symlinks(self) -> bool:
399 """Probe the filesystem to determine whether symlinks can be created.
401 Returns: True if symlinks can be created, False otherwise.
402 """
403 # For now, just mimic the old behaviour
404 return sys.platform != "win32"
406 def _init_files(
407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
408 ) -> None:
409 """Initialize a default set of named files."""
410 from .config import ConfigFile
412 self._put_named_file("description", b"Unnamed repository")
413 f = BytesIO()
414 cf = ConfigFile()
415 if format is None:
416 format = 0
417 if format not in (0, 1):
418 raise ValueError(f"Unsupported repository format version: {format}")
419 cf.set("core", "repositoryformatversion", str(format))
420 if self._determine_file_mode():
421 cf.set("core", "filemode", True)
422 else:
423 cf.set("core", "filemode", False)
425 if symlinks is None and not bare:
426 symlinks = self._determine_symlinks()
428 if symlinks is False:
429 cf.set("core", "symlinks", symlinks)
431 cf.set("core", "bare", bare)
432 cf.set("core", "logallrefupdates", True)
433 cf.write_to_file(f)
434 self._put_named_file("config", f.getvalue())
435 self._put_named_file(os.path.join("info", "exclude"), b"")
437 def get_named_file(self, path: str) -> Optional[BinaryIO]:
438 """Get a file from the control dir with a specific name.
440 Although the filename should be interpreted as a filename relative to
441 the control dir in a disk-based Repo, the object returned need not be
442 pointing to a file in that location.
444 Args:
445 path: The path to the file, relative to the control dir.
446 Returns: An open file object, or None if the file does not exist.
447 """
448 raise NotImplementedError(self.get_named_file)
450 def _put_named_file(self, path: str, contents: bytes) -> None:
451 """Write a file to the control dir with the given name and contents.
453 Args:
454 path: The path to the file, relative to the control dir.
455 contents: A string to write to the file.
456 """
457 raise NotImplementedError(self._put_named_file)
459 def _del_named_file(self, path: str) -> None:
460 """Delete a file in the control directory with the given name."""
461 raise NotImplementedError(self._del_named_file)
463 def open_index(self) -> "Index":
464 """Open the index for this repository.
466 Raises:
467 NoIndexPresent: If no index is present
468 Returns: The matching `Index`
469 """
470 raise NotImplementedError(self.open_index)
472 def fetch(
473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None
474 ):
475 """Fetch objects into another repository.
477 Args:
478 target: The target repository
479 determine_wants: Optional function to determine what refs to
480 fetch.
481 progress: Optional progress function
482 depth: Optional shallow fetch depth
483 Returns: The local refs
484 """
485 if determine_wants is None:
486 determine_wants = target.object_store.determine_wants_all
487 count, pack_data = self.fetch_pack_data(
488 determine_wants,
489 target.get_graph_walker(),
490 progress=progress,
491 depth=depth,
492 )
493 target.object_store.add_pack_data(count, pack_data, progress)
494 return self.get_refs()
496 def fetch_pack_data(
497 self,
498 determine_wants,
499 graph_walker,
500 progress,
501 *,
502 get_tagged=None,
503 depth: Optional[int] = None,
504 ):
505 """Fetch the pack data required for a set of revisions.
507 Args:
508 determine_wants: Function that takes a dictionary with heads
509 and returns the list of heads to fetch.
510 graph_walker: Object that can iterate over the list of revisions
511 to fetch and has an "ack" method that will be called to acknowledge
512 that a revision is present.
513 progress: Simple progress function that will be called with
514 updated progress strings.
515 get_tagged: Function that returns a dict of pointed-to sha ->
516 tag sha for including tags.
517 depth: Shallow fetch depth
518 Returns: count and iterator over pack data
519 """
520 missing_objects = self.find_missing_objects(
521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
522 )
523 if missing_objects is None:
524 return 0, iter([])
525 remote_has = missing_objects.get_remote_has()
526 object_ids = list(missing_objects)
527 return len(object_ids), generate_unpacked_objects(
528 self.object_store, object_ids, progress=progress, other_haves=remote_has
529 )
531 def find_missing_objects(
532 self,
533 determine_wants,
534 graph_walker,
535 progress,
536 *,
537 get_tagged=None,
538 depth: Optional[int] = None,
539 ) -> Optional[MissingObjectFinder]:
540 """Fetch the missing objects required for a set of revisions.
542 Args:
543 determine_wants: Function that takes a dictionary with heads
544 and returns the list of heads to fetch.
545 graph_walker: Object that can iterate over the list of revisions
546 to fetch and has an "ack" method that will be called to acknowledge
547 that a revision is present.
548 progress: Simple progress function that will be called with
549 updated progress strings.
550 get_tagged: Function that returns a dict of pointed-to sha ->
551 tag sha for including tags.
552 depth: Shallow fetch depth
553 Returns: iterator over objects, with __len__ implemented
554 """
555 refs = serialize_refs(self.object_store, self.get_refs())
557 wants = determine_wants(refs)
558 if not isinstance(wants, list):
559 raise TypeError("determine_wants() did not return a list")
561 current_shallow = set(getattr(graph_walker, "shallow", set()))
563 if depth not in (None, 0):
564 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
565 # Only update if graph_walker has shallow attribute
566 if hasattr(graph_walker, "shallow"):
567 graph_walker.shallow.update(shallow - not_shallow)
568 new_shallow = graph_walker.shallow - current_shallow
569 unshallow = graph_walker.unshallow = not_shallow & current_shallow
570 if hasattr(graph_walker, "update_shallow"):
571 graph_walker.update_shallow(new_shallow, unshallow)
572 else:
573 unshallow = getattr(graph_walker, "unshallow", frozenset())
575 if wants == []:
576 # TODO(dborowitz): find a way to short-circuit that doesn't change
577 # this interface.
579 if getattr(graph_walker, "shallow", set()) or unshallow:
580 # Do not send a pack in shallow short-circuit path
581 return None
583 class DummyMissingObjectFinder:
584 def get_remote_has(self) -> None:
585 return None
587 def __len__(self) -> int:
588 return 0
590 def __iter__(self):
591 yield from []
593 return DummyMissingObjectFinder() # type: ignore
595 # If the graph walker is set up with an implementation that can
596 # ACK/NAK to the wire, it will write data to the client through
597 # this call as a side-effect.
598 haves = self.object_store.find_common_revisions(graph_walker)
600 # Deal with shallow requests separately because the haves do
601 # not reflect what objects are missing
602 if getattr(graph_walker, "shallow", set()) or unshallow:
603 # TODO: filter the haves commits from iter_shas. the specific
604 # commits aren't missing.
605 haves = []
607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
609 def get_parents(commit):
610 return parents_provider.get_parents(commit.id, commit)
612 return MissingObjectFinder(
613 self.object_store,
614 haves=haves,
615 wants=wants,
616 shallow=getattr(graph_walker, "shallow", set()),
617 progress=progress,
618 get_tagged=get_tagged,
619 get_parents=get_parents,
620 )
622 def generate_pack_data(
623 self,
624 have: list[ObjectID],
625 want: list[ObjectID],
626 progress: Optional[Callable[[str], None]] = None,
627 ofs_delta: Optional[bool] = None,
628 ):
629 """Generate pack data objects for a set of wants/haves.
631 Args:
632 have: List of SHA1s of objects that should not be sent
633 want: List of SHA1s of objects that should be sent
634 ofs_delta: Whether OFS deltas can be included
635 progress: Optional progress reporting method
636 """
637 return self.object_store.generate_pack_data(
638 have,
639 want,
640 shallow=self.get_shallow(),
641 progress=progress,
642 ofs_delta=ofs_delta,
643 )
645 def get_graph_walker(
646 self, heads: Optional[list[ObjectID]] = None
647 ) -> ObjectStoreGraphWalker:
648 """Retrieve a graph walker.
650 A graph walker is used by a remote repository (or proxy)
651 to find out which objects are present in this repository.
653 Args:
654 heads: Repository heads to use (optional)
655 Returns: A graph walker object
656 """
657 if heads is None:
658 heads = [
659 sha
660 for sha in self.refs.as_dict(b"refs/heads").values()
661 if sha in self.object_store
662 ]
663 parents_provider = ParentsProvider(self.object_store)
664 return ObjectStoreGraphWalker(
665 heads,
666 parents_provider.get_parents,
667 shallow=self.get_shallow(),
668 update_shallow=self.update_shallow,
669 )
671 def get_refs(self) -> dict[bytes, bytes]:
672 """Get dictionary with all refs.
674 Returns: A ``dict`` mapping ref names to SHA1s
675 """
676 return self.refs.as_dict()
678 def head(self) -> bytes:
679 """Return the SHA1 pointed at by HEAD."""
680 return self.refs[b"HEAD"]
682 def _get_object(self, sha, cls):
683 assert len(sha) in (20, 40)
684 ret = self.get_object(sha)
685 if not isinstance(ret, cls):
686 if cls is Commit:
687 raise NotCommitError(ret)
688 elif cls is Blob:
689 raise NotBlobError(ret)
690 elif cls is Tree:
691 raise NotTreeError(ret)
692 elif cls is Tag:
693 raise NotTagError(ret)
694 else:
695 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
696 return ret
698 def get_object(self, sha: bytes) -> ShaFile:
699 """Retrieve the object with the specified SHA.
701 Args:
702 sha: SHA to retrieve
703 Returns: A ShaFile object
704 Raises:
705 KeyError: when the object can not be found
706 """
707 return self.object_store[sha]
709 def parents_provider(self) -> ParentsProvider:
710 return ParentsProvider(
711 self.object_store,
712 grafts=self._graftpoints,
713 shallows=self.get_shallow(),
714 )
716 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
717 """Retrieve the parents of a specific commit.
719 If the specific commit is a graftpoint, the graft parents
720 will be returned instead.
722 Args:
723 sha: SHA of the commit for which to retrieve the parents
724 commit: Optional commit matching the sha
725 Returns: List of parents
726 """
727 return self.parents_provider().get_parents(sha, commit)
729 def get_config(self) -> "ConfigFile":
730 """Retrieve the config object.
732 Returns: `ConfigFile` object for the ``.git/config`` file.
733 """
734 raise NotImplementedError(self.get_config)
736 def get_worktree_config(self) -> "ConfigFile":
737 """Retrieve the worktree config object."""
738 raise NotImplementedError(self.get_worktree_config)
740 def get_description(self) -> Optional[str]:
741 """Retrieve the description for this repository.
743 Returns: String with the description of the repository
744 as set by the user.
745 """
746 raise NotImplementedError(self.get_description)
748 def set_description(self, description) -> None:
749 """Set the description for this repository.
751 Args:
752 description: Text to set as description for this repository.
753 """
754 raise NotImplementedError(self.set_description)
756 def get_rebase_state_manager(self):
757 """Get the appropriate rebase state manager for this repository.
759 Returns: RebaseStateManager instance
760 """
761 raise NotImplementedError(self.get_rebase_state_manager)
763 def get_blob_normalizer(self):
764 """Return a BlobNormalizer object for checkin/checkout operations.
766 Returns: BlobNormalizer instance
767 """
768 raise NotImplementedError(self.get_blob_normalizer)
770 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
771 """Read gitattributes for the repository.
773 Args:
774 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
776 Returns:
777 GitAttributes object that can be used to match paths
778 """
779 raise NotImplementedError(self.get_gitattributes)
781 def get_config_stack(self) -> "StackedConfig":
782 """Return a config stack for this repository.
784 This stack accesses the configuration for both this repository
785 itself (.git/config) and the global configuration, which usually
786 lives in ~/.gitconfig.
788 Returns: `Config` instance for this repository
789 """
790 from .config import ConfigFile, StackedConfig
792 local_config = self.get_config()
793 backends: list[ConfigFile] = [local_config]
794 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
795 backends.append(self.get_worktree_config())
797 backends += StackedConfig.default_backends()
798 return StackedConfig(backends, writable=local_config)
800 def get_shallow(self) -> set[ObjectID]:
801 """Get the set of shallow commits.
803 Returns: Set of shallow commits.
804 """
805 f = self.get_named_file("shallow")
806 if f is None:
807 return set()
808 with f:
809 return {line.strip() for line in f}
811 def update_shallow(self, new_shallow, new_unshallow) -> None:
812 """Update the list of shallow objects.
814 Args:
815 new_shallow: Newly shallow objects
816 new_unshallow: Newly no longer shallow objects
817 """
818 shallow = self.get_shallow()
819 if new_shallow:
820 shallow.update(new_shallow)
821 if new_unshallow:
822 shallow.difference_update(new_unshallow)
823 if shallow:
824 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
825 else:
826 self._del_named_file("shallow")
828 def get_peeled(self, ref: Ref) -> ObjectID:
829 """Get the peeled value of a ref.
831 Args:
832 ref: The refname to peel.
833 Returns: The fully-peeled SHA1 of a tag object, after peeling all
834 intermediate tags; if the original ref does not point to a tag,
835 this will equal the original SHA1.
836 """
837 cached = self.refs.get_peeled(ref)
838 if cached is not None:
839 return cached
840 return peel_sha(self.object_store, self.refs[ref])[1].id
842 @property
843 def notes(self) -> "Notes":
844 """Access notes functionality for this repository.
846 Returns:
847 Notes object for accessing notes
848 """
849 from .notes import Notes
851 return Notes(self.object_store, self.refs)
853 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs):
854 """Obtain a walker for this repository.
856 Args:
857 include: Iterable of SHAs of commits to include along with their
858 ancestors. Defaults to [HEAD]
860 Keyword Args:
861 exclude: Iterable of SHAs of commits to exclude along with their
862 ancestors, overriding includes.
863 order: ORDER_* constant specifying the order of results.
864 Anything other than ORDER_DATE may result in O(n) memory usage.
865 reverse: If True, reverse the order of output, requiring O(n)
866 memory.
867 max_entries: The maximum number of entries to yield, or None for
868 no limit.
869 paths: Iterable of file or subtree paths to show entries for.
870 rename_detector: diff.RenameDetector object for detecting
871 renames.
872 follow: If True, follow path across renames/copies. Forces a
873 default rename_detector.
874 since: Timestamp to list commits after.
875 until: Timestamp to list commits before.
876 queue_cls: A class to use for a queue of commits, supporting the
877 iterator protocol. The constructor takes a single argument, the
878 Walker.
880 Returns: A `Walker` object
881 """
882 from .walk import Walker
884 if include is None:
885 include = [self.head()]
887 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
889 return Walker(self.object_store, include, **kwargs)
891 def __getitem__(self, name: Union[ObjectID, Ref]):
892 """Retrieve a Git object by SHA1 or ref.
894 Args:
895 name: A Git object SHA1 or a ref name
896 Returns: A `ShaFile` object, such as a Commit or Blob
897 Raises:
898 KeyError: when the specified ref or object does not exist
899 """
900 if not isinstance(name, bytes):
901 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
902 if len(name) in (20, 40):
903 try:
904 return self.object_store[name]
905 except (KeyError, ValueError):
906 pass
907 try:
908 return self.object_store[self.refs[name]]
909 except RefFormatError as exc:
910 raise KeyError(name) from exc
912 def __contains__(self, name: bytes) -> bool:
913 """Check if a specific Git object or ref is present.
915 Args:
916 name: Git object SHA1 or ref name
917 """
918 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
919 return name in self.object_store or name in self.refs
920 else:
921 return name in self.refs
923 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
924 """Set a ref.
926 Args:
927 name: ref name
928 value: Ref value - either a ShaFile object, or a hex sha
929 """
930 if name.startswith(b"refs/") or name == b"HEAD":
931 if isinstance(value, ShaFile):
932 self.refs[name] = value.id
933 elif isinstance(value, bytes):
934 self.refs[name] = value
935 else:
936 raise TypeError(value)
937 else:
938 raise ValueError(name)
940 def __delitem__(self, name: bytes) -> None:
941 """Remove a ref.
943 Args:
944 name: Name of the ref to remove
945 """
946 if name.startswith(b"refs/") or name == b"HEAD":
947 del self.refs[name]
948 else:
949 raise ValueError(name)
951 def _get_user_identity(
952 self, config: "StackedConfig", kind: Optional[str] = None
953 ) -> bytes:
954 """Determine the identity to use for new commits."""
955 warnings.warn(
956 "use get_user_identity() rather than Repo._get_user_identity",
957 DeprecationWarning,
958 )
959 return get_user_identity(config)
961 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
962 """Add or modify graftpoints.
964 Args:
965 updated_graftpoints: Dict of commit shas to list of parent shas
966 """
967 # Simple validation
968 for commit, parents in updated_graftpoints.items():
969 for sha in [commit, *parents]:
970 check_hexsha(sha, "Invalid graftpoint")
972 self._graftpoints.update(updated_graftpoints)
974 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
975 """Remove graftpoints.
977 Args:
978 to_remove: List of commit shas
979 """
980 for sha in to_remove:
981 del self._graftpoints[sha]
983 def _read_heads(self, name):
984 f = self.get_named_file(name)
985 if f is None:
986 return []
987 with f:
988 return [line.strip() for line in f.readlines() if line.strip()]
990 def do_commit(
991 self,
992 message: Optional[bytes] = None,
993 committer: Optional[bytes] = None,
994 author: Optional[bytes] = None,
995 commit_timestamp=None,
996 commit_timezone=None,
997 author_timestamp=None,
998 author_timezone=None,
999 tree: Optional[ObjectID] = None,
1000 encoding: Optional[bytes] = None,
1001 ref: Optional[Ref] = b"HEAD",
1002 merge_heads: Optional[list[ObjectID]] = None,
1003 no_verify: bool = False,
1004 sign: bool = False,
1005 ):
1006 """Create a new commit.
1008 If not specified, committer and author default to
1009 get_user_identity(..., 'COMMITTER')
1010 and get_user_identity(..., 'AUTHOR') respectively.
1012 Args:
1013 message: Commit message
1014 committer: Committer fullname
1015 author: Author fullname
1016 commit_timestamp: Commit timestamp (defaults to now)
1017 commit_timezone: Commit timestamp timezone (defaults to GMT)
1018 author_timestamp: Author timestamp (defaults to commit
1019 timestamp)
1020 author_timezone: Author timestamp timezone
1021 (defaults to commit timestamp timezone)
1022 tree: SHA1 of the tree root to use (if not specified the
1023 current index will be committed).
1024 encoding: Encoding
1025 ref: Optional ref to commit to (defaults to current branch).
1026 If None, creates a dangling commit without updating any ref.
1027 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1028 no_verify: Skip pre-commit and commit-msg hooks
1029 sign: GPG Sign the commit (bool, defaults to False,
1030 pass True to use default GPG key,
1031 pass a str containing Key ID to use a specific GPG key)
1033 Returns:
1034 New commit SHA1
1035 """
1036 try:
1037 if not no_verify:
1038 self.hooks["pre-commit"].execute()
1039 except HookError as exc:
1040 raise CommitError(exc) from exc
1041 except KeyError: # no hook defined, silent fallthrough
1042 pass
1044 c = Commit()
1045 if tree is None:
1046 index = self.open_index()
1047 c.tree = index.commit(self.object_store)
1048 else:
1049 if len(tree) != 40:
1050 raise ValueError("tree must be a 40-byte hex sha string")
1051 c.tree = tree
1053 config = self.get_config_stack()
1054 if merge_heads is None:
1055 merge_heads = self._read_heads("MERGE_HEAD")
1056 if committer is None:
1057 committer = get_user_identity(config, kind="COMMITTER")
1058 check_user_identity(committer)
1059 c.committer = committer
1060 if commit_timestamp is None:
1061 # FIXME: Support GIT_COMMITTER_DATE environment variable
1062 commit_timestamp = time.time()
1063 c.commit_time = int(commit_timestamp)
1064 if commit_timezone is None:
1065 # FIXME: Use current user timezone rather than UTC
1066 commit_timezone = 0
1067 c.commit_timezone = commit_timezone
1068 if author is None:
1069 author = get_user_identity(config, kind="AUTHOR")
1070 c.author = author
1071 check_user_identity(author)
1072 if author_timestamp is None:
1073 # FIXME: Support GIT_AUTHOR_DATE environment variable
1074 author_timestamp = commit_timestamp
1075 c.author_time = int(author_timestamp)
1076 if author_timezone is None:
1077 author_timezone = commit_timezone
1078 c.author_timezone = author_timezone
1079 if encoding is None:
1080 try:
1081 encoding = config.get(("i18n",), "commitEncoding")
1082 except KeyError:
1083 pass # No dice
1084 if encoding is not None:
1085 c.encoding = encoding
1086 if message is None:
1087 # FIXME: Try to read commit message from .git/MERGE_MSG
1088 raise ValueError("No commit message specified")
1090 try:
1091 if no_verify:
1092 c.message = message
1093 else:
1094 c.message = self.hooks["commit-msg"].execute(message)
1095 if c.message is None:
1096 c.message = message
1097 except HookError as exc:
1098 raise CommitError(exc) from exc
1099 except KeyError: # no hook defined, message not modified
1100 c.message = message
1102 # Check if we should sign the commit
1103 should_sign = sign
1104 if sign is None:
1105 # Check commit.gpgSign configuration when sign is not explicitly set
1106 config = self.get_config_stack()
1107 try:
1108 should_sign = config.get_boolean((b"commit",), b"gpgSign")
1109 except KeyError:
1110 should_sign = False # Default to not signing if no config
1111 keyid = sign if isinstance(sign, str) else None
1113 if ref is None:
1114 # Create a dangling commit
1115 c.parents = merge_heads
1116 if should_sign:
1117 c.sign(keyid)
1118 self.object_store.add_object(c)
1119 else:
1120 try:
1121 old_head = self.refs[ref]
1122 c.parents = [old_head, *merge_heads]
1123 if should_sign:
1124 c.sign(keyid)
1125 self.object_store.add_object(c)
1126 ok = self.refs.set_if_equals(
1127 ref,
1128 old_head,
1129 c.id,
1130 message=b"commit: " + message,
1131 committer=committer,
1132 timestamp=commit_timestamp,
1133 timezone=commit_timezone,
1134 )
1135 except KeyError:
1136 c.parents = merge_heads
1137 if should_sign:
1138 c.sign(keyid)
1139 self.object_store.add_object(c)
1140 ok = self.refs.add_if_new(
1141 ref,
1142 c.id,
1143 message=b"commit: " + message,
1144 committer=committer,
1145 timestamp=commit_timestamp,
1146 timezone=commit_timezone,
1147 )
1148 if not ok:
1149 # Fail if the atomic compare-and-swap failed, leaving the
1150 # commit and all its objects as garbage.
1151 raise CommitError(f"{ref!r} changed during commit")
1153 self._del_named_file("MERGE_HEAD")
1155 try:
1156 self.hooks["post-commit"].execute()
1157 except HookError as e: # silent failure
1158 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
1159 except KeyError: # no hook defined, silent fallthrough
1160 pass
1162 # Trigger auto GC if needed
1163 from .gc import maybe_auto_gc
1165 maybe_auto_gc(self)
1167 return c.id
1170def read_gitfile(f):
1171 """Read a ``.git`` file.
1173 The first line of the file should start with "gitdir: "
1175 Args:
1176 f: File-like object to read from
1177 Returns: A path
1178 """
1179 cs = f.read()
1180 if not cs.startswith("gitdir: "):
1181 raise ValueError("Expected file to start with 'gitdir: '")
1182 return cs[len("gitdir: ") :].rstrip("\n")
1185class UnsupportedVersion(Exception):
1186 """Unsupported repository version."""
1188 def __init__(self, version) -> None:
1189 self.version = version
1192class UnsupportedExtension(Exception):
1193 """Unsupported repository extension."""
1195 def __init__(self, extension) -> None:
1196 self.extension = extension
1199class Repo(BaseRepo):
1200 """A git repository backed by local disk.
1202 To open an existing repository, call the constructor with
1203 the path of the repository.
1205 To create a new repository, use the Repo.init class method.
1207 Note that a repository object may hold on to resources such
1208 as file handles for performance reasons; call .close() to free
1209 up those resources.
1211 Attributes:
1212 path: Path to the working copy (if it exists) or repository control
1213 directory (if the repository is bare)
1214 bare: Whether this is a bare repository
1215 """
1217 path: str
1218 bare: bool
1220 def __init__(
1221 self,
1222 root: Union[str, bytes, os.PathLike],
1223 object_store: Optional[PackBasedObjectStore] = None,
1224 bare: Optional[bool] = None,
1225 ) -> None:
1226 """Open a repository on disk.
1228 Args:
1229 root: Path to the repository's root.
1230 object_store: ObjectStore to use; if omitted, we use the
1231 repository's default object store
1232 bare: True if this is a bare repository.
1233 """
1234 root = os.fspath(root)
1235 if isinstance(root, bytes):
1236 root = os.fsdecode(root)
1237 hidden_path = os.path.join(root, CONTROLDIR)
1238 if bare is None:
1239 if os.path.isfile(hidden_path) or os.path.isdir(
1240 os.path.join(hidden_path, OBJECTDIR)
1241 ):
1242 bare = False
1243 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1244 os.path.join(root, REFSDIR)
1245 ):
1246 bare = True
1247 else:
1248 raise NotGitRepository(
1249 "No git repository was found at {path}".format(**dict(path=root))
1250 )
1252 self.bare = bare
1253 if bare is False:
1254 if os.path.isfile(hidden_path):
1255 with open(hidden_path) as f:
1256 path = read_gitfile(f)
1257 self._controldir = os.path.join(root, path)
1258 else:
1259 self._controldir = hidden_path
1260 else:
1261 self._controldir = root
1262 commondir = self.get_named_file(COMMONDIR)
1263 if commondir is not None:
1264 with commondir:
1265 self._commondir = os.path.join(
1266 self.controldir(),
1267 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1268 )
1269 else:
1270 self._commondir = self._controldir
1271 self.path = root
1273 # Initialize refs early so they're available for config condition matchers
1274 self.refs = DiskRefsContainer(
1275 self.commondir(), self._controldir, logger=self._write_reflog
1276 )
1278 config = self.get_config()
1279 try:
1280 repository_format_version = config.get("core", "repositoryformatversion")
1281 format_version = (
1282 0
1283 if repository_format_version is None
1284 else int(repository_format_version)
1285 )
1286 except KeyError:
1287 format_version = 0
1289 if format_version not in (0, 1):
1290 raise UnsupportedVersion(format_version)
1292 # Track extensions we encounter
1293 has_reftable_extension = False
1294 for extension, value in config.items((b"extensions",)):
1295 if extension.lower() == b"refstorage":
1296 if value == b"reftable":
1297 has_reftable_extension = True
1298 else:
1299 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1300 elif extension.lower() not in (b"worktreeconfig",):
1301 raise UnsupportedExtension(extension)
1303 if object_store is None:
1304 object_store = DiskObjectStore.from_config(
1305 os.path.join(self.commondir(), OBJECTDIR), config
1306 )
1308 # Use reftable if extension is configured
1309 if has_reftable_extension:
1310 from .reftable import ReftableRefsContainer
1312 self.refs = ReftableRefsContainer(self.commondir())
1313 BaseRepo.__init__(self, object_store, self.refs)
1315 self._graftpoints = {}
1316 graft_file = self.get_named_file(
1317 os.path.join("info", "grafts"), basedir=self.commondir()
1318 )
1319 if graft_file:
1320 with graft_file:
1321 self._graftpoints.update(parse_graftpoints(graft_file))
1322 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1323 if graft_file:
1324 with graft_file:
1325 self._graftpoints.update(parse_graftpoints(graft_file))
1327 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1328 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1329 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1330 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1332 def _write_reflog(
1333 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1334 ) -> None:
1335 from .reflog import format_reflog_line
1337 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1338 try:
1339 os.makedirs(os.path.dirname(path))
1340 except FileExistsError:
1341 pass
1342 if committer is None:
1343 config = self.get_config_stack()
1344 committer = get_user_identity(config)
1345 check_user_identity(committer)
1346 if timestamp is None:
1347 timestamp = int(time.time())
1348 if timezone is None:
1349 timezone = 0 # FIXME
1350 with open(path, "ab") as f:
1351 f.write(
1352 format_reflog_line(
1353 old_sha, new_sha, committer, timestamp, timezone, message
1354 )
1355 + b"\n"
1356 )
1358 def read_reflog(self, ref):
1359 """Read reflog entries for a reference.
1361 Args:
1362 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1364 Yields:
1365 reflog.Entry objects in chronological order (oldest first)
1366 """
1367 from .reflog import read_reflog
1369 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1370 try:
1371 with open(path, "rb") as f:
1372 yield from read_reflog(f)
1373 except FileNotFoundError:
1374 return
1376 @classmethod
1377 def discover(cls, start="."):
1378 """Iterate parent directories to discover a repository.
1380 Return a Repo object for the first parent directory that looks like a
1381 Git repository.
1383 Args:
1384 start: The directory to start discovery from (defaults to '.')
1385 """
1386 remaining = True
1387 path = os.path.abspath(start)
1388 while remaining:
1389 try:
1390 return cls(path)
1391 except NotGitRepository:
1392 path, remaining = os.path.split(path)
1393 raise NotGitRepository(
1394 "No git repository was found at {path}".format(**dict(path=start))
1395 )
1397 def controldir(self):
1398 """Return the path of the control directory."""
1399 return self._controldir
1401 def commondir(self):
1402 """Return the path of the common directory.
1404 For a main working tree, it is identical to controldir().
1406 For a linked working tree, it is the control directory of the
1407 main working tree.
1408 """
1409 return self._commondir
1411 def _determine_file_mode(self):
1412 """Probe the file-system to determine whether permissions can be trusted.
1414 Returns: True if permissions can be trusted, False otherwise.
1415 """
1416 fname = os.path.join(self.path, ".probe-permissions")
1417 with open(fname, "w") as f:
1418 f.write("")
1420 st1 = os.lstat(fname)
1421 try:
1422 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1423 except PermissionError:
1424 return False
1425 st2 = os.lstat(fname)
1427 os.unlink(fname)
1429 mode_differs = st1.st_mode != st2.st_mode
1430 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1432 return mode_differs and st2_has_exec
1434 def _determine_symlinks(self):
1435 """Probe the filesystem to determine whether symlinks can be created.
1437 Returns: True if symlinks can be created, False otherwise.
1438 """
1439 # TODO(jelmer): Actually probe disk / look at filesystem
1440 return sys.platform != "win32"
1442 def _put_named_file(self, path, contents) -> None:
1443 """Write a file to the control dir with the given name and contents.
1445 Args:
1446 path: The path to the file, relative to the control dir.
1447 contents: A string to write to the file.
1448 """
1449 path = path.lstrip(os.path.sep)
1450 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1451 f.write(contents)
1453 def _del_named_file(self, path) -> None:
1454 try:
1455 os.unlink(os.path.join(self.controldir(), path))
1456 except FileNotFoundError:
1457 return
1459 def get_named_file(self, path, basedir=None):
1460 """Get a file from the control dir with a specific name.
1462 Although the filename should be interpreted as a filename relative to
1463 the control dir in a disk-based Repo, the object returned need not be
1464 pointing to a file in that location.
1466 Args:
1467 path: The path to the file, relative to the control dir.
1468 basedir: Optional argument that specifies an alternative to the
1469 control dir.
1470 Returns: An open file object, or None if the file does not exist.
1471 """
1472 # TODO(dborowitz): sanitize filenames, since this is used directly by
1473 # the dumb web serving code.
1474 if basedir is None:
1475 basedir = self.controldir()
1476 path = path.lstrip(os.path.sep)
1477 try:
1478 return open(os.path.join(basedir, path), "rb")
1479 except FileNotFoundError:
1480 return None
1482 def index_path(self):
1483 """Return path to the index file."""
1484 return os.path.join(self.controldir(), INDEX_FILENAME)
1486 def open_index(self) -> "Index":
1487 """Open the index for this repository.
1489 Raises:
1490 NoIndexPresent: If no index is present
1491 Returns: The matching `Index`
1492 """
1493 from .index import Index
1495 if not self.has_index():
1496 raise NoIndexPresent
1498 # Check for manyFiles feature configuration
1499 config = self.get_config_stack()
1500 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1501 skip_hash = False
1502 index_version = None
1504 if many_files:
1505 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1506 try:
1507 index_version_str = config.get(b"index", b"version")
1508 index_version = int(index_version_str)
1509 except KeyError:
1510 index_version = 4 # Default to version 4 for manyFiles
1511 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1512 else:
1513 # Check for explicit index settings
1514 try:
1515 index_version_str = config.get(b"index", b"version")
1516 index_version = int(index_version_str)
1517 except KeyError:
1518 index_version = None
1519 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1521 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1523 def has_index(self) -> bool:
1524 """Check if an index is present."""
1525 # Bare repos must never have index files; non-bare repos may have a
1526 # missing index file, which is treated as empty.
1527 return not self.bare
1529 def stage(
1530 self,
1531 fs_paths: Union[
1532 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1533 ],
1534 ) -> None:
1535 """Stage a set of paths.
1537 Args:
1538 fs_paths: List of paths, relative to the repository path
1539 """
1540 root_path_bytes = os.fsencode(self.path)
1542 if isinstance(fs_paths, (str, bytes, os.PathLike)):
1543 fs_paths = [fs_paths]
1544 fs_paths = list(fs_paths)
1546 from .index import (
1547 _fs_to_tree_path,
1548 blob_from_path_and_stat,
1549 index_entry_from_directory,
1550 index_entry_from_stat,
1551 )
1553 index = self.open_index()
1554 blob_normalizer = self.get_blob_normalizer()
1555 for fs_path in fs_paths:
1556 if not isinstance(fs_path, bytes):
1557 fs_path = os.fsencode(fs_path)
1558 if os.path.isabs(fs_path):
1559 raise ValueError(
1560 f"path {fs_path!r} should be relative to "
1561 "repository root, not absolute"
1562 )
1563 tree_path = _fs_to_tree_path(fs_path)
1564 full_path = os.path.join(root_path_bytes, fs_path)
1565 try:
1566 st = os.lstat(full_path)
1567 except OSError:
1568 # File no longer exists
1569 try:
1570 del index[tree_path]
1571 except KeyError:
1572 pass # already removed
1573 else:
1574 if stat.S_ISDIR(st.st_mode):
1575 entry = index_entry_from_directory(st, full_path)
1576 if entry:
1577 index[tree_path] = entry
1578 else:
1579 try:
1580 del index[tree_path]
1581 except KeyError:
1582 pass
1583 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
1584 try:
1585 del index[tree_path]
1586 except KeyError:
1587 pass
1588 else:
1589 blob = blob_from_path_and_stat(full_path, st)
1590 blob = blob_normalizer.checkin_normalize(blob, fs_path)
1591 self.object_store.add_object(blob)
1592 index[tree_path] = index_entry_from_stat(st, blob.id)
1593 index.write()
1595 def unstage(self, fs_paths: list[str]) -> None:
1596 """Unstage specific file in the index
1597 Args:
1598 fs_paths: a list of files to unstage,
1599 relative to the repository path.
1600 """
1601 from .index import IndexEntry, _fs_to_tree_path
1603 index = self.open_index()
1604 try:
1605 tree_id = self[b"HEAD"].tree
1606 except KeyError:
1607 # no head mean no commit in the repo
1608 for fs_path in fs_paths:
1609 tree_path = _fs_to_tree_path(fs_path)
1610 del index[tree_path]
1611 index.write()
1612 return
1614 for fs_path in fs_paths:
1615 tree_path = _fs_to_tree_path(fs_path)
1616 try:
1617 tree = self.object_store[tree_id]
1618 assert isinstance(tree, Tree)
1619 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path)
1620 except KeyError:
1621 # if tree_entry didn't exist, this file was being added, so
1622 # remove index entry
1623 try:
1624 del index[tree_path]
1625 continue
1626 except KeyError as exc:
1627 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
1629 st = None
1630 try:
1631 st = os.lstat(os.path.join(self.path, fs_path))
1632 except FileNotFoundError:
1633 pass
1635 index_entry = IndexEntry(
1636 ctime=(self[b"HEAD"].commit_time, 0),
1637 mtime=(self[b"HEAD"].commit_time, 0),
1638 dev=st.st_dev if st else 0,
1639 ino=st.st_ino if st else 0,
1640 mode=tree_entry[0],
1641 uid=st.st_uid if st else 0,
1642 gid=st.st_gid if st else 0,
1643 size=len(self[tree_entry[1]].data),
1644 sha=tree_entry[1],
1645 flags=0,
1646 extended_flags=0,
1647 )
1649 index[tree_path] = index_entry
1650 index.write()
1652 def clone(
1653 self,
1654 target_path,
1655 *,
1656 mkdir=True,
1657 bare=False,
1658 origin=b"origin",
1659 checkout=None,
1660 branch=None,
1661 progress=None,
1662 depth: Optional[int] = None,
1663 symlinks=None,
1664 ) -> "Repo":
1665 """Clone this repository.
1667 Args:
1668 target_path: Target path
1669 mkdir: Create the target directory
1670 bare: Whether to create a bare repository
1671 checkout: Whether or not to check-out HEAD after cloning
1672 origin: Base name for refs in target repository
1673 cloned from this repository
1674 branch: Optional branch or tag to be used as HEAD in the new repository
1675 instead of this repository's HEAD.
1676 progress: Optional progress function
1677 depth: Depth at which to fetch
1678 symlinks: Symlinks setting (default to autodetect)
1679 Returns: Created repository as `Repo`
1680 """
1681 encoded_path = os.fsencode(self.path)
1683 if mkdir:
1684 os.mkdir(target_path)
1686 try:
1687 if not bare:
1688 target = Repo.init(target_path, symlinks=symlinks)
1689 if checkout is None:
1690 checkout = True
1691 else:
1692 if checkout:
1693 raise ValueError("checkout and bare are incompatible")
1694 target = Repo.init_bare(target_path)
1696 try:
1697 target_config = target.get_config()
1698 target_config.set((b"remote", origin), b"url", encoded_path)
1699 target_config.set(
1700 (b"remote", origin),
1701 b"fetch",
1702 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1703 )
1704 target_config.write_to_path()
1706 ref_message = b"clone: from " + encoded_path
1707 self.fetch(target, depth=depth)
1708 target.refs.import_refs(
1709 b"refs/remotes/" + origin,
1710 self.refs.as_dict(b"refs/heads"),
1711 message=ref_message,
1712 )
1713 target.refs.import_refs(
1714 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1715 )
1717 head_chain, origin_sha = self.refs.follow(b"HEAD")
1718 origin_head = head_chain[-1] if head_chain else None
1719 if origin_sha and not origin_head:
1720 # set detached HEAD
1721 target.refs[b"HEAD"] = origin_sha
1722 else:
1723 _set_origin_head(target.refs, origin, origin_head)
1724 head_ref = _set_default_branch(
1725 target.refs, origin, origin_head, branch, ref_message
1726 )
1728 # Update target head
1729 if head_ref:
1730 head = _set_head(target.refs, head_ref, ref_message)
1731 else:
1732 head = None
1734 if checkout and head is not None:
1735 target.reset_index()
1736 except BaseException:
1737 target.close()
1738 raise
1739 except BaseException:
1740 if mkdir:
1741 import shutil
1743 shutil.rmtree(target_path)
1744 raise
1745 return target
1747 def reset_index(self, tree: Optional[bytes] = None):
1748 """Reset the index back to a specific tree.
1750 Args:
1751 tree: Tree SHA to reset to, None for current HEAD tree.
1752 """
1753 from .index import (
1754 build_index_from_tree,
1755 symlink,
1756 validate_path_element_default,
1757 validate_path_element_hfs,
1758 validate_path_element_ntfs,
1759 )
1761 if tree is None:
1762 head = self[b"HEAD"]
1763 if isinstance(head, Tag):
1764 _cls, obj = head.object
1765 head = self.get_object(obj)
1766 tree = head.tree
1767 config = self.get_config()
1768 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
1769 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
1770 validate_path_element = validate_path_element_ntfs
1771 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
1772 validate_path_element = validate_path_element_hfs
1773 else:
1774 validate_path_element = validate_path_element_default
1775 if config.get_boolean(b"core", b"symlinks", True):
1776 symlink_fn = symlink
1777 else:
1779 def symlink_fn(source, target) -> None: # type: ignore
1780 with open(
1781 target, "w" + ("b" if isinstance(source, bytes) else "")
1782 ) as f:
1783 f.write(source)
1785 blob_normalizer = self.get_blob_normalizer()
1786 return build_index_from_tree(
1787 self.path,
1788 self.index_path(),
1789 self.object_store,
1790 tree,
1791 honor_filemode=honor_filemode,
1792 validate_path_element=validate_path_element,
1793 symlink_fn=symlink_fn,
1794 blob_normalizer=blob_normalizer,
1795 )
1797 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1798 """Get condition matchers for includeIf conditions.
1800 Returns a dict of condition prefix to matcher function.
1801 """
1802 from pathlib import Path
1804 from .config import ConditionMatcher, match_glob_pattern
1806 # Add gitdir matchers
1807 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1808 # Handle relative patterns (starting with ./)
1809 if pattern.startswith("./"):
1810 # Can't handle relative patterns without config directory context
1811 return False
1813 # Normalize repository path
1814 try:
1815 repo_path = str(Path(self._controldir).resolve())
1816 except (OSError, ValueError):
1817 return False
1819 # Expand ~ in pattern and normalize
1820 pattern = os.path.expanduser(pattern)
1822 # Normalize pattern following Git's rules
1823 pattern = pattern.replace("\\", "/")
1824 if not pattern.startswith(("~/", "./", "/", "**")):
1825 # Check for Windows absolute path
1826 if len(pattern) >= 2 and pattern[1] == ":":
1827 pass
1828 else:
1829 pattern = "**/" + pattern
1830 if pattern.endswith("/"):
1831 pattern = pattern + "**"
1833 # Use the existing _match_gitdir_pattern function
1834 from .config import _match_gitdir_pattern
1836 pattern_bytes = pattern.encode("utf-8", errors="replace")
1837 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1839 return _match_gitdir_pattern(
1840 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1841 )
1843 # Add onbranch matcher
1844 def match_onbranch(pattern: str) -> bool:
1845 try:
1846 # Get the current branch using refs
1847 ref_chain, _ = self.refs.follow(b"HEAD")
1848 head_ref = ref_chain[-1] # Get the final resolved ref
1849 except KeyError:
1850 pass
1851 else:
1852 if head_ref and head_ref.startswith(b"refs/heads/"):
1853 # Extract branch name from ref
1854 branch = head_ref[11:].decode("utf-8", errors="replace")
1855 return match_glob_pattern(branch, pattern)
1856 return False
1858 matchers: dict[str, ConditionMatcher] = {
1859 "onbranch:": match_onbranch,
1860 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1861 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1862 }
1864 return matchers
1866 def get_worktree_config(self) -> "ConfigFile":
1867 from .config import ConfigFile
1869 path = os.path.join(self.commondir(), "config.worktree")
1870 try:
1871 # Pass condition matchers for includeIf evaluation
1872 condition_matchers = self._get_config_condition_matchers()
1873 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1874 except FileNotFoundError:
1875 cf = ConfigFile()
1876 cf.path = path
1877 return cf
1879 def get_config(self) -> "ConfigFile":
1880 """Retrieve the config object.
1882 Returns: `ConfigFile` object for the ``.git/config`` file.
1883 """
1884 from .config import ConfigFile
1886 path = os.path.join(self._commondir, "config")
1887 try:
1888 # Pass condition matchers for includeIf evaluation
1889 condition_matchers = self._get_config_condition_matchers()
1890 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1891 except FileNotFoundError:
1892 ret = ConfigFile()
1893 ret.path = path
1894 return ret
1896 def get_rebase_state_manager(self):
1897 """Get the appropriate rebase state manager for this repository.
1899 Returns: DiskRebaseStateManager instance
1900 """
1901 import os
1903 from .rebase import DiskRebaseStateManager
1905 path = os.path.join(self.controldir(), "rebase-merge")
1906 return DiskRebaseStateManager(path)
1908 def get_description(self):
1909 """Retrieve the description of this repository.
1911 Returns: A string describing the repository or None.
1912 """
1913 path = os.path.join(self._controldir, "description")
1914 try:
1915 with GitFile(path, "rb") as f:
1916 return f.read()
1917 except FileNotFoundError:
1918 return None
1920 def __repr__(self) -> str:
1921 return f"<Repo at {self.path!r}>"
1923 def set_description(self, description) -> None:
1924 """Set the description for this repository.
1926 Args:
1927 description: Text to set as description for this repository.
1928 """
1929 self._put_named_file("description", description)
1931 @classmethod
1932 def _init_maybe_bare(
1933 cls,
1934 path: Union[str, bytes, os.PathLike],
1935 controldir: Union[str, bytes, os.PathLike],
1936 bare,
1937 object_store=None,
1938 config=None,
1939 default_branch=None,
1940 symlinks: Optional[bool] = None,
1941 format: Optional[int] = None,
1942 ):
1943 path = os.fspath(path)
1944 if isinstance(path, bytes):
1945 path = os.fsdecode(path)
1946 controldir = os.fspath(controldir)
1947 if isinstance(controldir, bytes):
1948 controldir = os.fsdecode(controldir)
1949 for d in BASE_DIRECTORIES:
1950 os.mkdir(os.path.join(controldir, *d))
1951 if object_store is None:
1952 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1953 ret = cls(path, bare=bare, object_store=object_store)
1954 if default_branch is None:
1955 if config is None:
1956 from .config import StackedConfig
1958 config = StackedConfig.default()
1959 try:
1960 default_branch = config.get("init", "defaultBranch")
1961 except KeyError:
1962 default_branch = DEFAULT_BRANCH
1963 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1964 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1965 return ret
1967 @classmethod
1968 def init(
1969 cls,
1970 path: Union[str, bytes, os.PathLike],
1971 *,
1972 mkdir: bool = False,
1973 config=None,
1974 default_branch=None,
1975 symlinks: Optional[bool] = None,
1976 format: Optional[int] = None,
1977 ) -> "Repo":
1978 """Create a new repository.
1980 Args:
1981 path: Path in which to create the repository
1982 mkdir: Whether to create the directory
1983 format: Repository format version (defaults to 0)
1984 Returns: `Repo` instance
1985 """
1986 path = os.fspath(path)
1987 if isinstance(path, bytes):
1988 path = os.fsdecode(path)
1989 if mkdir:
1990 os.mkdir(path)
1991 controldir = os.path.join(path, CONTROLDIR)
1992 os.mkdir(controldir)
1993 _set_filesystem_hidden(controldir)
1994 return cls._init_maybe_bare(
1995 path,
1996 controldir,
1997 False,
1998 config=config,
1999 default_branch=default_branch,
2000 symlinks=symlinks,
2001 format=format,
2002 )
2004 @classmethod
2005 def _init_new_working_directory(
2006 cls,
2007 path: Union[str, bytes, os.PathLike],
2008 main_repo,
2009 identifier=None,
2010 mkdir=False,
2011 ):
2012 """Create a new working directory linked to a repository.
2014 Args:
2015 path: Path in which to create the working tree.
2016 main_repo: Main repository to reference
2017 identifier: Worktree identifier
2018 mkdir: Whether to create the directory
2019 Returns: `Repo` instance
2020 """
2021 path = os.fspath(path)
2022 if isinstance(path, bytes):
2023 path = os.fsdecode(path)
2024 if mkdir:
2025 os.mkdir(path)
2026 if identifier is None:
2027 identifier = os.path.basename(path)
2028 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES)
2029 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2030 gitdirfile = os.path.join(path, CONTROLDIR)
2031 with open(gitdirfile, "wb") as f:
2032 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
2033 try:
2034 os.mkdir(main_worktreesdir)
2035 except FileExistsError:
2036 pass
2037 try:
2038 os.mkdir(worktree_controldir)
2039 except FileExistsError:
2040 pass
2041 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2042 f.write(os.fsencode(gitdirfile) + b"\n")
2043 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2044 f.write(b"../..\n")
2045 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2046 f.write(main_repo.head() + b"\n")
2047 r = cls(path)
2048 r.reset_index()
2049 return r
2051 @classmethod
2052 def init_bare(
2053 cls,
2054 path: Union[str, bytes, os.PathLike],
2055 *,
2056 mkdir=False,
2057 object_store=None,
2058 config=None,
2059 default_branch=None,
2060 format: Optional[int] = None,
2061 ):
2062 """Create a new bare repository.
2064 ``path`` should already exist and be an empty directory.
2066 Args:
2067 path: Path to create bare repository in
2068 format: Repository format version (defaults to 0)
2069 Returns: a `Repo` instance
2070 """
2071 path = os.fspath(path)
2072 if isinstance(path, bytes):
2073 path = os.fsdecode(path)
2074 if mkdir:
2075 os.mkdir(path)
2076 return cls._init_maybe_bare(
2077 path,
2078 path,
2079 True,
2080 object_store=object_store,
2081 config=config,
2082 default_branch=default_branch,
2083 format=format,
2084 )
2086 create = init_bare
2088 def close(self) -> None:
2089 """Close any files opened by this repository."""
2090 self.object_store.close()
2092 def __enter__(self):
2093 return self
2095 def __exit__(self, exc_type, exc_val, exc_tb):
2096 self.close()
2098 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2099 """Read .gitattributes file from working tree.
2101 Returns:
2102 Dictionary mapping file patterns to attributes
2103 """
2104 gitattributes = {}
2105 gitattributes_path = os.path.join(self.path, ".gitattributes")
2107 if os.path.exists(gitattributes_path):
2108 with open(gitattributes_path, "rb") as f:
2109 for line in f:
2110 line = line.strip()
2111 if not line or line.startswith(b"#"):
2112 continue
2114 parts = line.split()
2115 if len(parts) < 2:
2116 continue
2118 pattern = parts[0]
2119 attrs = {}
2121 for attr in parts[1:]:
2122 if attr.startswith(b"-"):
2123 # Unset attribute
2124 attrs[attr[1:]] = b"false"
2125 elif b"=" in attr:
2126 # Set to value
2127 key, value = attr.split(b"=", 1)
2128 attrs[key] = value
2129 else:
2130 # Set attribute
2131 attrs[attr] = b"true"
2133 gitattributes[pattern] = attrs
2135 return gitattributes
2137 def get_blob_normalizer(self):
2138 """Return a BlobNormalizer object."""
2139 from .filters import FilterBlobNormalizer, FilterRegistry
2141 # Get proper GitAttributes object
2142 git_attributes = self.get_gitattributes()
2143 config_stack = self.get_config_stack()
2145 # Create FilterRegistry with repo reference
2146 filter_registry = FilterRegistry(config_stack, self)
2148 # Return FilterBlobNormalizer which handles all filters including line endings
2149 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2151 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2152 """Read gitattributes for the repository.
2154 Args:
2155 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2157 Returns:
2158 GitAttributes object that can be used to match paths
2159 """
2160 from .attrs import (
2161 GitAttributes,
2162 Pattern,
2163 parse_git_attributes,
2164 )
2166 patterns = []
2168 # Read system gitattributes (TODO: implement this)
2169 # Read global gitattributes (TODO: implement this)
2171 # Read repository .gitattributes from index/tree
2172 if tree is None:
2173 try:
2174 # Try to get from HEAD
2175 head = self[b"HEAD"]
2176 if isinstance(head, Tag):
2177 _cls, obj = head.object
2178 head = self.get_object(obj)
2179 tree = head.tree
2180 except KeyError:
2181 # No HEAD, no attributes from tree
2182 pass
2184 if tree is not None:
2185 try:
2186 tree_obj = self[tree]
2187 if b".gitattributes" in tree_obj:
2188 _, attrs_sha = tree_obj[b".gitattributes"]
2189 attrs_blob = self[attrs_sha]
2190 if isinstance(attrs_blob, Blob):
2191 attrs_data = BytesIO(attrs_blob.data)
2192 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2193 pattern = Pattern(pattern_bytes)
2194 patterns.append((pattern, attrs))
2195 except (KeyError, NotTreeError):
2196 pass
2198 # Read .git/info/attributes
2199 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2200 if os.path.exists(info_attrs_path):
2201 with open(info_attrs_path, "rb") as f:
2202 for pattern_bytes, attrs in parse_git_attributes(f):
2203 pattern = Pattern(pattern_bytes)
2204 patterns.append((pattern, attrs))
2206 # Read .gitattributes from working directory (if it exists)
2207 working_attrs_path = os.path.join(self.path, ".gitattributes")
2208 if os.path.exists(working_attrs_path):
2209 with open(working_attrs_path, "rb") as f:
2210 for pattern_bytes, attrs in parse_git_attributes(f):
2211 pattern = Pattern(pattern_bytes)
2212 patterns.append((pattern, attrs))
2214 return GitAttributes(patterns)
2216 def _sparse_checkout_file_path(self) -> str:
2217 """Return the path of the sparse-checkout file in this repo's control dir."""
2218 return os.path.join(self.controldir(), "info", "sparse-checkout")
2220 def configure_for_cone_mode(self) -> None:
2221 """Ensure the repository is configured for cone-mode sparse-checkout."""
2222 config = self.get_config()
2223 config.set((b"core",), b"sparseCheckout", b"true")
2224 config.set((b"core",), b"sparseCheckoutCone", b"true")
2225 config.write_to_path()
2227 def infer_cone_mode(self) -> bool:
2228 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2229 config = self.get_config()
2230 try:
2231 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
2232 return sc_cone == b"true"
2233 except KeyError:
2234 # If core.sparseCheckoutCone is not set, default to False
2235 return False
2237 def get_sparse_checkout_patterns(self) -> list[str]:
2238 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2240 Returns:
2241 A list of patterns. Returns an empty list if the file is missing.
2242 """
2243 path = self._sparse_checkout_file_path()
2244 try:
2245 with open(path, encoding="utf-8") as f:
2246 return [line.strip() for line in f if line.strip()]
2247 except FileNotFoundError:
2248 return []
2250 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2251 """Write the given sparse-checkout patterns into info/sparse-checkout.
2253 Creates the info/ directory if it does not exist.
2255 Args:
2256 patterns: A list of gitignore-style patterns to store.
2257 """
2258 info_dir = os.path.join(self.controldir(), "info")
2259 os.makedirs(info_dir, exist_ok=True)
2261 path = self._sparse_checkout_file_path()
2262 with open(path, "w", encoding="utf-8") as f:
2263 for pat in patterns:
2264 f.write(pat + "\n")
2266 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2267 """Write the given cone-mode directory patterns into info/sparse-checkout.
2269 For each directory to include, add an inclusion line that "undoes" the prior
2270 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2271 Never add the same line twice.
2272 """
2273 patterns = ["/*", "!/*/"]
2274 if dirs:
2275 for d in dirs:
2276 d = d.strip("/")
2277 line = f"/{d}/"
2278 if d and line not in patterns:
2279 patterns.append(line)
2280 self.set_sparse_checkout_patterns(patterns)
2283class MemoryRepo(BaseRepo):
2284 """Repo that stores refs, objects, and named files in memory.
2286 MemoryRepos are always bare: they have no working tree and no index, since
2287 those have a stronger dependency on the filesystem.
2288 """
2290 def __init__(self) -> None:
2291 """Create a new repository in memory."""
2292 from .config import ConfigFile
2294 self._reflog: list[Any] = []
2295 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2296 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore
2297 self._named_files: dict[str, bytes] = {}
2298 self.bare = True
2299 self._config = ConfigFile()
2300 self._description = None
2302 def _append_reflog(self, *args) -> None:
2303 self._reflog.append(args)
2305 def set_description(self, description) -> None:
2306 self._description = description
2308 def get_description(self):
2309 return self._description
2311 def _determine_file_mode(self):
2312 """Probe the file-system to determine whether permissions can be trusted.
2314 Returns: True if permissions can be trusted, False otherwise.
2315 """
2316 return sys.platform != "win32"
2318 def _determine_symlinks(self):
2319 """Probe the file-system to determine whether permissions can be trusted.
2321 Returns: True if permissions can be trusted, False otherwise.
2322 """
2323 return sys.platform != "win32"
2325 def _put_named_file(self, path, contents) -> None:
2326 """Write a file to the control dir with the given name and contents.
2328 Args:
2329 path: The path to the file, relative to the control dir.
2330 contents: A string to write to the file.
2331 """
2332 self._named_files[path] = contents
2334 def _del_named_file(self, path) -> None:
2335 try:
2336 del self._named_files[path]
2337 except KeyError:
2338 pass
2340 def get_named_file(self, path, basedir=None):
2341 """Get a file from the control dir with a specific name.
2343 Although the filename should be interpreted as a filename relative to
2344 the control dir in a disk-baked Repo, the object returned need not be
2345 pointing to a file in that location.
2347 Args:
2348 path: The path to the file, relative to the control dir.
2349 Returns: An open file object, or None if the file does not exist.
2350 """
2351 contents = self._named_files.get(path, None)
2352 if contents is None:
2353 return None
2354 return BytesIO(contents)
2356 def open_index(self) -> "Index":
2357 """Fail to open index for this repo, since it is bare.
2359 Raises:
2360 NoIndexPresent: Raised when no index is present
2361 """
2362 raise NoIndexPresent
2364 def get_config(self):
2365 """Retrieve the config object.
2367 Returns: `ConfigFile` object.
2368 """
2369 return self._config
2371 def get_rebase_state_manager(self):
2372 """Get the appropriate rebase state manager for this repository.
2374 Returns: MemoryRebaseStateManager instance
2375 """
2376 from .rebase import MemoryRebaseStateManager
2378 return MemoryRebaseStateManager(self)
2380 def get_blob_normalizer(self):
2381 """Return a BlobNormalizer object for checkin/checkout operations."""
2382 from .filters import FilterBlobNormalizer, FilterRegistry
2384 # Get GitAttributes object
2385 git_attributes = self.get_gitattributes()
2386 config_stack = self.get_config_stack()
2388 # Create FilterRegistry with repo reference
2389 filter_registry = FilterRegistry(config_stack, self)
2391 # Return FilterBlobNormalizer which handles all filters
2392 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2394 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2395 """Read gitattributes for the repository."""
2396 from .attrs import GitAttributes
2398 # Memory repos don't have working trees or gitattributes files
2399 # Return empty GitAttributes
2400 return GitAttributes([])
2402 @classmethod
2403 def init_bare(cls, objects, refs, format: Optional[int] = None):
2404 """Create a new bare repository in memory.
2406 Args:
2407 objects: Objects for the new repository,
2408 as iterable
2409 refs: Refs as dictionary, mapping names
2410 to object SHA1s
2411 format: Repository format version (defaults to 0)
2412 """
2413 ret = cls()
2414 for obj in objects:
2415 ret.object_store.add_object(obj)
2416 for refname, sha in refs.items():
2417 ret.refs.add_if_new(refname, sha)
2418 ret._init_files(bare=True, format=format)
2419 return ret