Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/repo.py: 44%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Repository access.
25This module contains the base class for git repositories
26(BaseRepo) and an implementation which uses a repository on
27local disk (Repo).
29"""
31import os
32import stat
33import sys
34import time
35import warnings
36from io import BytesIO
37from typing import (
38 TYPE_CHECKING,
39 Any,
40 BinaryIO,
41 Callable,
42 Dict,
43 FrozenSet,
44 Iterable,
45 List,
46 Optional,
47 Set,
48 Tuple,
49 Union,
50)
52if TYPE_CHECKING:
53 # There are no circular imports here, but we try to defer imports as long
54 # as possible to reduce start-up time for anything that doesn't need
55 # these imports.
56 from .config import ConfigFile, StackedConfig
57 from .index import Index
59from .errors import (
60 CommitError,
61 HookError,
62 NoIndexPresent,
63 NotBlobError,
64 NotCommitError,
65 NotGitRepository,
66 NotTagError,
67 NotTreeError,
68 RefFormatError,
69)
70from .file import GitFile
71from .hooks import (
72 CommitMsgShellHook,
73 Hook,
74 PostCommitShellHook,
75 PostReceiveShellHook,
76 PreCommitShellHook,
77)
78from .line_ending import BlobNormalizer, TreeBlobNormalizer
79from .object_store import (
80 DiskObjectStore,
81 MemoryObjectStore,
82 MissingObjectFinder,
83 ObjectStoreGraphWalker,
84 PackBasedObjectStore,
85 peel_sha,
86)
87from .objects import (
88 Blob,
89 Commit,
90 ObjectID,
91 ShaFile,
92 Tag,
93 Tree,
94 check_hexsha,
95 valid_hexsha,
96)
97from .pack import generate_unpacked_objects
98from .refs import (
99 ANNOTATED_TAG_SUFFIX, # noqa: F401
100 LOCAL_BRANCH_PREFIX,
101 LOCAL_TAG_PREFIX, # noqa: F401
102 SYMREF, # noqa: F401
103 DictRefsContainer,
104 DiskRefsContainer,
105 InfoRefsContainer, # noqa: F401
106 Ref,
107 RefsContainer,
108 _set_default_branch,
109 _set_head,
110 _set_origin_head,
111 check_ref_format, # noqa: F401
112 read_packed_refs, # noqa: F401
113 read_packed_refs_with_peeled, # noqa: F401
114 serialize_refs,
115 write_packed_refs, # noqa: F401
116)
118CONTROLDIR = ".git"
119OBJECTDIR = "objects"
120REFSDIR = "refs"
121REFSDIR_TAGS = "tags"
122REFSDIR_HEADS = "heads"
123INDEX_FILENAME = "index"
124COMMONDIR = "commondir"
125GITDIR = "gitdir"
126WORKTREES = "worktrees"
128BASE_DIRECTORIES = [
129 ["branches"],
130 [REFSDIR],
131 [REFSDIR, REFSDIR_TAGS],
132 [REFSDIR, REFSDIR_HEADS],
133 ["hooks"],
134 ["info"],
135]
137DEFAULT_BRANCH = b"master"
140class InvalidUserIdentity(Exception):
141 """User identity is not of the format 'user <email>'."""
143 def __init__(self, identity) -> None:
144 self.identity = identity
147class DefaultIdentityNotFound(Exception):
148 """Default identity could not be determined."""
151# TODO(jelmer): Cache?
152def _get_default_identity() -> Tuple[str, str]:
153 import socket
155 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
156 username = os.environ.get(name)
157 if username:
158 break
159 else:
160 username = None
162 try:
163 import pwd
164 except ImportError:
165 fullname = None
166 else:
167 try:
168 entry = pwd.getpwuid(os.getuid()) # type: ignore
169 except KeyError:
170 fullname = None
171 else:
172 if getattr(entry, "gecos", None):
173 fullname = entry.pw_gecos.split(",")[0]
174 else:
175 fullname = None
176 if username is None:
177 username = entry.pw_name
178 if not fullname:
179 if username is None:
180 raise DefaultIdentityNotFound("no username found")
181 fullname = username
182 email = os.environ.get("EMAIL")
183 if email is None:
184 if username is None:
185 raise DefaultIdentityNotFound("no username found")
186 email = f"{username}@{socket.gethostname()}"
187 return (fullname, email)
190def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
191 """Determine the identity to use for new commits.
193 If kind is set, this first checks
194 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
196 If those variables are not set, then it will fall back
197 to reading the user.name and user.email settings from
198 the specified configuration.
200 If that also fails, then it will fall back to using
201 the current users' identity as obtained from the host
202 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
204 Args:
205 kind: Optional kind to return identity for,
206 usually either "AUTHOR" or "COMMITTER".
208 Returns:
209 A user identity
210 """
211 user: Optional[bytes] = None
212 email: Optional[bytes] = None
213 if kind:
214 user_uc = os.environ.get("GIT_" + kind + "_NAME")
215 if user_uc is not None:
216 user = user_uc.encode("utf-8")
217 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
218 if email_uc is not None:
219 email = email_uc.encode("utf-8")
220 if user is None:
221 try:
222 user = config.get(("user",), "name")
223 except KeyError:
224 user = None
225 if email is None:
226 try:
227 email = config.get(("user",), "email")
228 except KeyError:
229 email = None
230 default_user, default_email = _get_default_identity()
231 if user is None:
232 user = default_user.encode("utf-8")
233 if email is None:
234 email = default_email.encode("utf-8")
235 if email.startswith(b"<") and email.endswith(b">"):
236 email = email[1:-1]
237 return user + b" <" + email + b">"
240def check_user_identity(identity):
241 """Verify that a user identity is formatted correctly.
243 Args:
244 identity: User identity bytestring
245 Raises:
246 InvalidUserIdentity: Raised when identity is invalid
247 """
248 try:
249 fst, snd = identity.split(b" <", 1)
250 except ValueError as exc:
251 raise InvalidUserIdentity(identity) from exc
252 if b">" not in snd:
253 raise InvalidUserIdentity(identity)
254 if b"\0" in identity or b"\n" in identity:
255 raise InvalidUserIdentity(identity)
258def parse_graftpoints(
259 graftpoints: Iterable[bytes],
260) -> Dict[bytes, List[bytes]]:
261 """Convert a list of graftpoints into a dict.
263 Args:
264 graftpoints: Iterator of graftpoint lines
266 Each line is formatted as:
267 <commit sha1> <parent sha1> [<parent sha1>]*
269 Resulting dictionary is:
270 <commit sha1>: [<parent sha1>*]
272 https://git.wiki.kernel.org/index.php/GraftPoint
273 """
274 grafts = {}
275 for line in graftpoints:
276 raw_graft = line.split(None, 1)
278 commit = raw_graft[0]
279 if len(raw_graft) == 2:
280 parents = raw_graft[1].split()
281 else:
282 parents = []
284 for sha in [commit, *parents]:
285 check_hexsha(sha, "Invalid graftpoint")
287 grafts[commit] = parents
288 return grafts
291def serialize_graftpoints(graftpoints: Dict[bytes, List[bytes]]) -> bytes:
292 """Convert a dictionary of grafts into string.
294 The graft dictionary is:
295 <commit sha1>: [<parent sha1>*]
297 Each line is formatted as:
298 <commit sha1> <parent sha1> [<parent sha1>]*
300 https://git.wiki.kernel.org/index.php/GraftPoint
302 """
303 graft_lines = []
304 for commit, parents in graftpoints.items():
305 if parents:
306 graft_lines.append(commit + b" " + b" ".join(parents))
307 else:
308 graft_lines.append(commit)
309 return b"\n".join(graft_lines)
312def _set_filesystem_hidden(path):
313 """Mark path as to be hidden if supported by platform and filesystem.
315 On win32 uses SetFileAttributesW api:
316 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
317 """
318 if sys.platform == "win32":
319 import ctypes
320 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
322 FILE_ATTRIBUTE_HIDDEN = 2
323 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
324 ("SetFileAttributesW", ctypes.windll.kernel32)
325 )
327 if isinstance(path, bytes):
328 path = os.fsdecode(path)
329 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
330 pass # Could raise or log `ctypes.WinError()` here
332 # Could implement other platform specific filesystem hiding here
335class ParentsProvider:
336 def __init__(self, store, grafts={}, shallows=[]) -> None:
337 self.store = store
338 self.grafts = grafts
339 self.shallows = set(shallows)
341 def get_parents(self, commit_id, commit=None):
342 try:
343 return self.grafts[commit_id]
344 except KeyError:
345 pass
346 if commit_id in self.shallows:
347 return []
348 if commit is None:
349 commit = self.store[commit_id]
350 return commit.parents
353class BaseRepo:
354 """Base class for a git repository.
356 This base class is meant to be used for Repository implementations that e.g.
357 work on top of a different transport than a standard filesystem path.
359 Attributes:
360 object_store: Dictionary-like object for accessing
361 the objects
362 refs: Dictionary-like object with the refs in this
363 repository
364 """
366 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
367 """Open a repository.
369 This shouldn't be called directly, but rather through one of the
370 base classes, such as MemoryRepo or Repo.
372 Args:
373 object_store: Object store to use
374 refs: Refs container to use
375 """
376 self.object_store = object_store
377 self.refs = refs
379 self._graftpoints: Dict[bytes, List[bytes]] = {}
380 self.hooks: Dict[str, Hook] = {}
382 def _determine_file_mode(self) -> bool:
383 """Probe the file-system to determine whether permissions can be trusted.
385 Returns: True if permissions can be trusted, False otherwise.
386 """
387 raise NotImplementedError(self._determine_file_mode)
389 def _determine_symlinks(self) -> bool:
390 """Probe the filesystem to determine whether symlinks can be created.
392 Returns: True if symlinks can be created, False otherwise.
393 """
394 # For now, just mimic the old behaviour
395 return sys.platform != "win32"
397 def _init_files(self, bare: bool, symlinks: Optional[bool] = None) -> None:
398 """Initialize a default set of named files."""
399 from .config import ConfigFile
401 self._put_named_file("description", b"Unnamed repository")
402 f = BytesIO()
403 cf = ConfigFile()
404 cf.set("core", "repositoryformatversion", "0")
405 if self._determine_file_mode():
406 cf.set("core", "filemode", True)
407 else:
408 cf.set("core", "filemode", False)
410 if symlinks is None and not bare:
411 symlinks = self._determine_symlinks()
413 if symlinks is False:
414 cf.set("core", "symlinks", symlinks)
416 cf.set("core", "bare", bare)
417 cf.set("core", "logallrefupdates", True)
418 cf.write_to_file(f)
419 self._put_named_file("config", f.getvalue())
420 self._put_named_file(os.path.join("info", "exclude"), b"")
422 def get_named_file(self, path: str) -> Optional[BinaryIO]:
423 """Get a file from the control dir with a specific name.
425 Although the filename should be interpreted as a filename relative to
426 the control dir in a disk-based Repo, the object returned need not be
427 pointing to a file in that location.
429 Args:
430 path: The path to the file, relative to the control dir.
431 Returns: An open file object, or None if the file does not exist.
432 """
433 raise NotImplementedError(self.get_named_file)
435 def _put_named_file(self, path: str, contents: bytes):
436 """Write a file to the control dir with the given name and contents.
438 Args:
439 path: The path to the file, relative to the control dir.
440 contents: A string to write to the file.
441 """
442 raise NotImplementedError(self._put_named_file)
444 def _del_named_file(self, path: str):
445 """Delete a file in the control directory with the given name."""
446 raise NotImplementedError(self._del_named_file)
448 def open_index(self) -> "Index":
449 """Open the index for this repository.
451 Raises:
452 NoIndexPresent: If no index is present
453 Returns: The matching `Index`
454 """
455 raise NotImplementedError(self.open_index)
457 def fetch(self, target, determine_wants=None, progress=None, depth=None):
458 """Fetch objects into another repository.
460 Args:
461 target: The target repository
462 determine_wants: Optional function to determine what refs to
463 fetch.
464 progress: Optional progress function
465 depth: Optional shallow fetch depth
466 Returns: The local refs
467 """
468 if determine_wants is None:
469 determine_wants = target.object_store.determine_wants_all
470 count, pack_data = self.fetch_pack_data(
471 determine_wants,
472 target.get_graph_walker(),
473 progress=progress,
474 depth=depth,
475 )
476 target.object_store.add_pack_data(count, pack_data, progress)
477 return self.get_refs()
479 def fetch_pack_data(
480 self,
481 determine_wants,
482 graph_walker,
483 progress,
484 get_tagged=None,
485 depth=None,
486 ):
487 """Fetch the pack data required for a set of revisions.
489 Args:
490 determine_wants: Function that takes a dictionary with heads
491 and returns the list of heads to fetch.
492 graph_walker: Object that can iterate over the list of revisions
493 to fetch and has an "ack" method that will be called to acknowledge
494 that a revision is present.
495 progress: Simple progress function that will be called with
496 updated progress strings.
497 get_tagged: Function that returns a dict of pointed-to sha ->
498 tag sha for including tags.
499 depth: Shallow fetch depth
500 Returns: count and iterator over pack data
501 """
502 missing_objects = self.find_missing_objects(
503 determine_wants, graph_walker, progress, get_tagged, depth=depth
504 )
505 remote_has = missing_objects.get_remote_has()
506 object_ids = list(missing_objects)
507 return len(object_ids), generate_unpacked_objects(
508 self.object_store, object_ids, progress=progress, other_haves=remote_has
509 )
511 def find_missing_objects(
512 self,
513 determine_wants,
514 graph_walker,
515 progress,
516 get_tagged=None,
517 depth=None,
518 ) -> Optional[MissingObjectFinder]:
519 """Fetch the missing objects required for a set of revisions.
521 Args:
522 determine_wants: Function that takes a dictionary with heads
523 and returns the list of heads to fetch.
524 graph_walker: Object that can iterate over the list of revisions
525 to fetch and has an "ack" method that will be called to acknowledge
526 that a revision is present.
527 progress: Simple progress function that will be called with
528 updated progress strings.
529 get_tagged: Function that returns a dict of pointed-to sha ->
530 tag sha for including tags.
531 depth: Shallow fetch depth
532 Returns: iterator over objects, with __len__ implemented
533 """
534 if depth not in (None, 0):
535 raise NotImplementedError("depth not supported yet")
537 refs = serialize_refs(self.object_store, self.get_refs())
539 wants = determine_wants(refs)
540 if not isinstance(wants, list):
541 raise TypeError("determine_wants() did not return a list")
543 shallows: FrozenSet[ObjectID] = getattr(graph_walker, "shallow", frozenset())
544 unshallows: FrozenSet[ObjectID] = getattr(
545 graph_walker, "unshallow", frozenset()
546 )
548 if wants == []:
549 # TODO(dborowitz): find a way to short-circuit that doesn't change
550 # this interface.
552 if shallows or unshallows:
553 # Do not send a pack in shallow short-circuit path
554 return None
556 class DummyMissingObjectFinder:
557 def get_remote_has(self):
558 return None
560 def __len__(self) -> int:
561 return 0
563 def __iter__(self):
564 yield from []
566 return DummyMissingObjectFinder() # type: ignore
568 # If the graph walker is set up with an implementation that can
569 # ACK/NAK to the wire, it will write data to the client through
570 # this call as a side-effect.
571 haves = self.object_store.find_common_revisions(graph_walker)
573 # Deal with shallow requests separately because the haves do
574 # not reflect what objects are missing
575 if shallows or unshallows:
576 # TODO: filter the haves commits from iter_shas. the specific
577 # commits aren't missing.
578 haves = []
580 parents_provider = ParentsProvider(self.object_store, shallows=shallows)
582 def get_parents(commit):
583 return parents_provider.get_parents(commit.id, commit)
585 return MissingObjectFinder(
586 self.object_store,
587 haves=haves,
588 wants=wants,
589 shallow=self.get_shallow(),
590 progress=progress,
591 get_tagged=get_tagged,
592 get_parents=get_parents,
593 )
595 def generate_pack_data(
596 self,
597 have: List[ObjectID],
598 want: List[ObjectID],
599 progress: Optional[Callable[[str], None]] = None,
600 ofs_delta: Optional[bool] = None,
601 ):
602 """Generate pack data objects for a set of wants/haves.
604 Args:
605 have: List of SHA1s of objects that should not be sent
606 want: List of SHA1s of objects that should be sent
607 ofs_delta: Whether OFS deltas can be included
608 progress: Optional progress reporting method
609 """
610 return self.object_store.generate_pack_data(
611 have,
612 want,
613 shallow=self.get_shallow(),
614 progress=progress,
615 ofs_delta=ofs_delta,
616 )
618 def get_graph_walker(
619 self, heads: Optional[List[ObjectID]] = None
620 ) -> ObjectStoreGraphWalker:
621 """Retrieve a graph walker.
623 A graph walker is used by a remote repository (or proxy)
624 to find out which objects are present in this repository.
626 Args:
627 heads: Repository heads to use (optional)
628 Returns: A graph walker object
629 """
630 if heads is None:
631 heads = [
632 sha
633 for sha in self.refs.as_dict(b"refs/heads").values()
634 if sha in self.object_store
635 ]
636 parents_provider = ParentsProvider(self.object_store)
637 return ObjectStoreGraphWalker(
638 heads, parents_provider.get_parents, shallow=self.get_shallow()
639 )
641 def get_refs(self) -> Dict[bytes, bytes]:
642 """Get dictionary with all refs.
644 Returns: A ``dict`` mapping ref names to SHA1s
645 """
646 return self.refs.as_dict()
648 def head(self) -> bytes:
649 """Return the SHA1 pointed at by HEAD."""
650 return self.refs[b"HEAD"]
652 def _get_object(self, sha, cls):
653 assert len(sha) in (20, 40)
654 ret = self.get_object(sha)
655 if not isinstance(ret, cls):
656 if cls is Commit:
657 raise NotCommitError(ret)
658 elif cls is Blob:
659 raise NotBlobError(ret)
660 elif cls is Tree:
661 raise NotTreeError(ret)
662 elif cls is Tag:
663 raise NotTagError(ret)
664 else:
665 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
666 return ret
668 def get_object(self, sha: bytes) -> ShaFile:
669 """Retrieve the object with the specified SHA.
671 Args:
672 sha: SHA to retrieve
673 Returns: A ShaFile object
674 Raises:
675 KeyError: when the object can not be found
676 """
677 return self.object_store[sha]
679 def parents_provider(self) -> ParentsProvider:
680 return ParentsProvider(
681 self.object_store,
682 grafts=self._graftpoints,
683 shallows=self.get_shallow(),
684 )
686 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> List[bytes]:
687 """Retrieve the parents of a specific commit.
689 If the specific commit is a graftpoint, the graft parents
690 will be returned instead.
692 Args:
693 sha: SHA of the commit for which to retrieve the parents
694 commit: Optional commit matching the sha
695 Returns: List of parents
696 """
697 return self.parents_provider().get_parents(sha, commit)
699 def get_config(self) -> "ConfigFile":
700 """Retrieve the config object.
702 Returns: `ConfigFile` object for the ``.git/config`` file.
703 """
704 raise NotImplementedError(self.get_config)
706 def get_worktree_config(self) -> "ConfigFile":
707 """Retrieve the worktree config object."""
708 raise NotImplementedError(self.get_worktree_config)
710 def get_description(self):
711 """Retrieve the description for this repository.
713 Returns: String with the description of the repository
714 as set by the user.
715 """
716 raise NotImplementedError(self.get_description)
718 def set_description(self, description):
719 """Set the description for this repository.
721 Args:
722 description: Text to set as description for this repository.
723 """
724 raise NotImplementedError(self.set_description)
726 def get_config_stack(self) -> "StackedConfig":
727 """Return a config stack for this repository.
729 This stack accesses the configuration for both this repository
730 itself (.git/config) and the global configuration, which usually
731 lives in ~/.gitconfig.
733 Returns: `Config` instance for this repository
734 """
735 from .config import ConfigFile, StackedConfig
737 local_config = self.get_config()
738 backends: List[ConfigFile] = [local_config]
739 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
740 backends.append(self.get_worktree_config())
742 backends += StackedConfig.default_backends()
743 return StackedConfig(backends, writable=local_config)
745 def get_shallow(self) -> Set[ObjectID]:
746 """Get the set of shallow commits.
748 Returns: Set of shallow commits.
749 """
750 f = self.get_named_file("shallow")
751 if f is None:
752 return set()
753 with f:
754 return {line.strip() for line in f}
756 def update_shallow(self, new_shallow, new_unshallow):
757 """Update the list of shallow objects.
759 Args:
760 new_shallow: Newly shallow objects
761 new_unshallow: Newly no longer shallow objects
762 """
763 shallow = self.get_shallow()
764 if new_shallow:
765 shallow.update(new_shallow)
766 if new_unshallow:
767 shallow.difference_update(new_unshallow)
768 if shallow:
769 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
770 else:
771 self._del_named_file("shallow")
773 def get_peeled(self, ref: Ref) -> ObjectID:
774 """Get the peeled value of a ref.
776 Args:
777 ref: The refname to peel.
778 Returns: The fully-peeled SHA1 of a tag object, after peeling all
779 intermediate tags; if the original ref does not point to a tag,
780 this will equal the original SHA1.
781 """
782 cached = self.refs.get_peeled(ref)
783 if cached is not None:
784 return cached
785 return peel_sha(self.object_store, self.refs[ref])[1].id
787 def get_walker(self, include: Optional[List[bytes]] = None, *args, **kwargs):
788 """Obtain a walker for this repository.
790 Args:
791 include: Iterable of SHAs of commits to include along with their
792 ancestors. Defaults to [HEAD]
793 exclude: Iterable of SHAs of commits to exclude along with their
794 ancestors, overriding includes.
795 order: ORDER_* constant specifying the order of results.
796 Anything other than ORDER_DATE may result in O(n) memory usage.
797 reverse: If True, reverse the order of output, requiring O(n)
798 memory.
799 max_entries: The maximum number of entries to yield, or None for
800 no limit.
801 paths: Iterable of file or subtree paths to show entries for.
802 rename_detector: diff.RenameDetector object for detecting
803 renames.
804 follow: If True, follow path across renames/copies. Forces a
805 default rename_detector.
806 since: Timestamp to list commits after.
807 until: Timestamp to list commits before.
808 queue_cls: A class to use for a queue of commits, supporting the
809 iterator protocol. The constructor takes a single argument, the
810 Walker.
811 Returns: A `Walker` object
812 """
813 from .walk import Walker
815 if include is None:
816 include = [self.head()]
818 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
820 return Walker(self.object_store, include, *args, **kwargs)
822 def __getitem__(self, name: Union[ObjectID, Ref]):
823 """Retrieve a Git object by SHA1 or ref.
825 Args:
826 name: A Git object SHA1 or a ref name
827 Returns: A `ShaFile` object, such as a Commit or Blob
828 Raises:
829 KeyError: when the specified ref or object does not exist
830 """
831 if not isinstance(name, bytes):
832 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
833 if len(name) in (20, 40):
834 try:
835 return self.object_store[name]
836 except (KeyError, ValueError):
837 pass
838 try:
839 return self.object_store[self.refs[name]]
840 except RefFormatError as exc:
841 raise KeyError(name) from exc
843 def __contains__(self, name: bytes) -> bool:
844 """Check if a specific Git object or ref is present.
846 Args:
847 name: Git object SHA1 or ref name
848 """
849 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
850 return name in self.object_store or name in self.refs
851 else:
852 return name in self.refs
854 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
855 """Set a ref.
857 Args:
858 name: ref name
859 value: Ref value - either a ShaFile object, or a hex sha
860 """
861 if name.startswith(b"refs/") or name == b"HEAD":
862 if isinstance(value, ShaFile):
863 self.refs[name] = value.id
864 elif isinstance(value, bytes):
865 self.refs[name] = value
866 else:
867 raise TypeError(value)
868 else:
869 raise ValueError(name)
871 def __delitem__(self, name: bytes) -> None:
872 """Remove a ref.
874 Args:
875 name: Name of the ref to remove
876 """
877 if name.startswith(b"refs/") or name == b"HEAD":
878 del self.refs[name]
879 else:
880 raise ValueError(name)
882 def _get_user_identity(
883 self, config: "StackedConfig", kind: Optional[str] = None
884 ) -> bytes:
885 """Determine the identity to use for new commits."""
886 warnings.warn(
887 "use get_user_identity() rather than Repo._get_user_identity",
888 DeprecationWarning,
889 )
890 return get_user_identity(config)
892 def _add_graftpoints(self, updated_graftpoints: Dict[bytes, List[bytes]]):
893 """Add or modify graftpoints.
895 Args:
896 updated_graftpoints: Dict of commit shas to list of parent shas
897 """
898 # Simple validation
899 for commit, parents in updated_graftpoints.items():
900 for sha in [commit, *parents]:
901 check_hexsha(sha, "Invalid graftpoint")
903 self._graftpoints.update(updated_graftpoints)
905 def _remove_graftpoints(self, to_remove: List[bytes] = []) -> None:
906 """Remove graftpoints.
908 Args:
909 to_remove: List of commit shas
910 """
911 for sha in to_remove:
912 del self._graftpoints[sha]
914 def _read_heads(self, name):
915 f = self.get_named_file(name)
916 if f is None:
917 return []
918 with f:
919 return [line.strip() for line in f.readlines() if line.strip()]
921 def do_commit(
922 self,
923 message: Optional[bytes] = None,
924 committer: Optional[bytes] = None,
925 author: Optional[bytes] = None,
926 commit_timestamp=None,
927 commit_timezone=None,
928 author_timestamp=None,
929 author_timezone=None,
930 tree: Optional[ObjectID] = None,
931 encoding: Optional[bytes] = None,
932 ref: Ref = b"HEAD",
933 merge_heads: Optional[List[ObjectID]] = None,
934 no_verify: bool = False,
935 sign: bool = False,
936 ):
937 """Create a new commit.
939 If not specified, committer and author default to
940 get_user_identity(..., 'COMMITTER')
941 and get_user_identity(..., 'AUTHOR') respectively.
943 Args:
944 message: Commit message
945 committer: Committer fullname
946 author: Author fullname
947 commit_timestamp: Commit timestamp (defaults to now)
948 commit_timezone: Commit timestamp timezone (defaults to GMT)
949 author_timestamp: Author timestamp (defaults to commit
950 timestamp)
951 author_timezone: Author timestamp timezone
952 (defaults to commit timestamp timezone)
953 tree: SHA1 of the tree root to use (if not specified the
954 current index will be committed).
955 encoding: Encoding
956 ref: Optional ref to commit to (defaults to current branch)
957 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
958 no_verify: Skip pre-commit and commit-msg hooks
959 sign: GPG Sign the commit (bool, defaults to False,
960 pass True to use default GPG key,
961 pass a str containing Key ID to use a specific GPG key)
963 Returns:
964 New commit SHA1
965 """
966 try:
967 if not no_verify:
968 self.hooks["pre-commit"].execute()
969 except HookError as exc:
970 raise CommitError(exc) from exc
971 except KeyError: # no hook defined, silent fallthrough
972 pass
974 c = Commit()
975 if tree is None:
976 index = self.open_index()
977 c.tree = index.commit(self.object_store)
978 else:
979 if len(tree) != 40:
980 raise ValueError("tree must be a 40-byte hex sha string")
981 c.tree = tree
983 config = self.get_config_stack()
984 if merge_heads is None:
985 merge_heads = self._read_heads("MERGE_HEAD")
986 if committer is None:
987 committer = get_user_identity(config, kind="COMMITTER")
988 check_user_identity(committer)
989 c.committer = committer
990 if commit_timestamp is None:
991 # FIXME: Support GIT_COMMITTER_DATE environment variable
992 commit_timestamp = time.time()
993 c.commit_time = int(commit_timestamp)
994 if commit_timezone is None:
995 # FIXME: Use current user timezone rather than UTC
996 commit_timezone = 0
997 c.commit_timezone = commit_timezone
998 if author is None:
999 author = get_user_identity(config, kind="AUTHOR")
1000 c.author = author
1001 check_user_identity(author)
1002 if author_timestamp is None:
1003 # FIXME: Support GIT_AUTHOR_DATE environment variable
1004 author_timestamp = commit_timestamp
1005 c.author_time = int(author_timestamp)
1006 if author_timezone is None:
1007 author_timezone = commit_timezone
1008 c.author_timezone = author_timezone
1009 if encoding is None:
1010 try:
1011 encoding = config.get(("i18n",), "commitEncoding")
1012 except KeyError:
1013 pass # No dice
1014 if encoding is not None:
1015 c.encoding = encoding
1016 if message is None:
1017 # FIXME: Try to read commit message from .git/MERGE_MSG
1018 raise ValueError("No commit message specified")
1020 try:
1021 if no_verify:
1022 c.message = message
1023 else:
1024 c.message = self.hooks["commit-msg"].execute(message)
1025 if c.message is None:
1026 c.message = message
1027 except HookError as exc:
1028 raise CommitError(exc) from exc
1029 except KeyError: # no hook defined, message not modified
1030 c.message = message
1032 keyid = sign if isinstance(sign, str) else None
1034 if ref is None:
1035 # Create a dangling commit
1036 c.parents = merge_heads
1037 if sign:
1038 c.sign(keyid)
1039 self.object_store.add_object(c)
1040 else:
1041 try:
1042 old_head = self.refs[ref]
1043 c.parents = [old_head, *merge_heads]
1044 if sign:
1045 c.sign(keyid)
1046 self.object_store.add_object(c)
1047 ok = self.refs.set_if_equals(
1048 ref,
1049 old_head,
1050 c.id,
1051 message=b"commit: " + message,
1052 committer=committer,
1053 timestamp=commit_timestamp,
1054 timezone=commit_timezone,
1055 )
1056 except KeyError:
1057 c.parents = merge_heads
1058 if sign:
1059 c.sign(keyid)
1060 self.object_store.add_object(c)
1061 ok = self.refs.add_if_new(
1062 ref,
1063 c.id,
1064 message=b"commit: " + message,
1065 committer=committer,
1066 timestamp=commit_timestamp,
1067 timezone=commit_timezone,
1068 )
1069 if not ok:
1070 # Fail if the atomic compare-and-swap failed, leaving the
1071 # commit and all its objects as garbage.
1072 raise CommitError(f"{ref!r} changed during commit")
1074 self._del_named_file("MERGE_HEAD")
1076 try:
1077 self.hooks["post-commit"].execute()
1078 except HookError as e: # silent failure
1079 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
1080 except KeyError: # no hook defined, silent fallthrough
1081 pass
1083 return c.id
1086def read_gitfile(f):
1087 """Read a ``.git`` file.
1089 The first line of the file should start with "gitdir: "
1091 Args:
1092 f: File-like object to read from
1093 Returns: A path
1094 """
1095 cs = f.read()
1096 if not cs.startswith("gitdir: "):
1097 raise ValueError("Expected file to start with 'gitdir: '")
1098 return cs[len("gitdir: ") :].rstrip("\n")
1101class UnsupportedVersion(Exception):
1102 """Unsupported repository version."""
1104 def __init__(self, version) -> None:
1105 self.version = version
1108class UnsupportedExtension(Exception):
1109 """Unsupported repository extension."""
1111 def __init__(self, extension) -> None:
1112 self.extension = extension
1115class Repo(BaseRepo):
1116 """A git repository backed by local disk.
1118 To open an existing repository, call the constructor with
1119 the path of the repository.
1121 To create a new repository, use the Repo.init class method.
1123 Note that a repository object may hold on to resources such
1124 as file handles for performance reasons; call .close() to free
1125 up those resources.
1127 Attributes:
1128 path: Path to the working copy (if it exists) or repository control
1129 directory (if the repository is bare)
1130 bare: Whether this is a bare repository
1131 """
1133 path: str
1134 bare: bool
1136 def __init__(
1137 self,
1138 root: str,
1139 object_store: Optional[PackBasedObjectStore] = None,
1140 bare: Optional[bool] = None,
1141 ) -> None:
1142 hidden_path = os.path.join(root, CONTROLDIR)
1143 if bare is None:
1144 if os.path.isfile(hidden_path) or os.path.isdir(
1145 os.path.join(hidden_path, OBJECTDIR)
1146 ):
1147 bare = False
1148 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1149 os.path.join(root, REFSDIR)
1150 ):
1151 bare = True
1152 else:
1153 raise NotGitRepository(
1154 "No git repository was found at {path}".format(**dict(path=root))
1155 )
1157 self.bare = bare
1158 if bare is False:
1159 if os.path.isfile(hidden_path):
1160 with open(hidden_path) as f:
1161 path = read_gitfile(f)
1162 self._controldir = os.path.join(root, path)
1163 else:
1164 self._controldir = hidden_path
1165 else:
1166 self._controldir = root
1167 commondir = self.get_named_file(COMMONDIR)
1168 if commondir is not None:
1169 with commondir:
1170 self._commondir = os.path.join(
1171 self.controldir(),
1172 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1173 )
1174 else:
1175 self._commondir = self._controldir
1176 self.path = root
1177 config = self.get_config()
1178 try:
1179 repository_format_version = config.get("core", "repositoryformatversion")
1180 format_version = (
1181 0
1182 if repository_format_version is None
1183 else int(repository_format_version)
1184 )
1185 except KeyError:
1186 format_version = 0
1188 if format_version not in (0, 1):
1189 raise UnsupportedVersion(format_version)
1191 for extension, _value in config.items((b"extensions",)):
1192 if extension.lower() not in (b"worktreeconfig",):
1193 raise UnsupportedExtension(extension)
1195 if object_store is None:
1196 object_store = DiskObjectStore.from_config(
1197 os.path.join(self.commondir(), OBJECTDIR), config
1198 )
1199 refs = DiskRefsContainer(
1200 self.commondir(), self._controldir, logger=self._write_reflog
1201 )
1202 BaseRepo.__init__(self, object_store, refs)
1204 self._graftpoints = {}
1205 graft_file = self.get_named_file(
1206 os.path.join("info", "grafts"), basedir=self.commondir()
1207 )
1208 if graft_file:
1209 with graft_file:
1210 self._graftpoints.update(parse_graftpoints(graft_file))
1211 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1212 if graft_file:
1213 with graft_file:
1214 self._graftpoints.update(parse_graftpoints(graft_file))
1216 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1217 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1218 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1219 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1221 def _write_reflog(
1222 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1223 ):
1224 from .reflog import format_reflog_line
1226 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1227 try:
1228 os.makedirs(os.path.dirname(path))
1229 except FileExistsError:
1230 pass
1231 if committer is None:
1232 config = self.get_config_stack()
1233 committer = self._get_user_identity(config)
1234 check_user_identity(committer)
1235 if timestamp is None:
1236 timestamp = int(time.time())
1237 if timezone is None:
1238 timezone = 0 # FIXME
1239 with open(path, "ab") as f:
1240 f.write(
1241 format_reflog_line(
1242 old_sha, new_sha, committer, timestamp, timezone, message
1243 )
1244 + b"\n"
1245 )
1247 @classmethod
1248 def discover(cls, start="."):
1249 """Iterate parent directories to discover a repository.
1251 Return a Repo object for the first parent directory that looks like a
1252 Git repository.
1254 Args:
1255 start: The directory to start discovery from (defaults to '.')
1256 """
1257 remaining = True
1258 path = os.path.abspath(start)
1259 while remaining:
1260 try:
1261 return cls(path)
1262 except NotGitRepository:
1263 path, remaining = os.path.split(path)
1264 raise NotGitRepository(
1265 "No git repository was found at {path}".format(**dict(path=start))
1266 )
1268 def controldir(self):
1269 """Return the path of the control directory."""
1270 return self._controldir
1272 def commondir(self):
1273 """Return the path of the common directory.
1275 For a main working tree, it is identical to controldir().
1277 For a linked working tree, it is the control directory of the
1278 main working tree.
1279 """
1280 return self._commondir
1282 def _determine_file_mode(self):
1283 """Probe the file-system to determine whether permissions can be trusted.
1285 Returns: True if permissions can be trusted, False otherwise.
1286 """
1287 fname = os.path.join(self.path, ".probe-permissions")
1288 with open(fname, "w") as f:
1289 f.write("")
1291 st1 = os.lstat(fname)
1292 try:
1293 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1294 except PermissionError:
1295 return False
1296 st2 = os.lstat(fname)
1298 os.unlink(fname)
1300 mode_differs = st1.st_mode != st2.st_mode
1301 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1303 return mode_differs and st2_has_exec
1305 def _determine_symlinks(self):
1306 """Probe the filesystem to determine whether symlinks can be created.
1308 Returns: True if symlinks can be created, False otherwise.
1309 """
1310 # TODO(jelmer): Actually probe disk / look at filesystem
1311 return sys.platform != "win32"
1313 def _put_named_file(self, path, contents):
1314 """Write a file to the control dir with the given name and contents.
1316 Args:
1317 path: The path to the file, relative to the control dir.
1318 contents: A string to write to the file.
1319 """
1320 path = path.lstrip(os.path.sep)
1321 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1322 f.write(contents)
1324 def _del_named_file(self, path):
1325 try:
1326 os.unlink(os.path.join(self.controldir(), path))
1327 except FileNotFoundError:
1328 return
1330 def get_named_file(self, path, basedir=None):
1331 """Get a file from the control dir with a specific name.
1333 Although the filename should be interpreted as a filename relative to
1334 the control dir in a disk-based Repo, the object returned need not be
1335 pointing to a file in that location.
1337 Args:
1338 path: The path to the file, relative to the control dir.
1339 basedir: Optional argument that specifies an alternative to the
1340 control dir.
1341 Returns: An open file object, or None if the file does not exist.
1342 """
1343 # TODO(dborowitz): sanitize filenames, since this is used directly by
1344 # the dumb web serving code.
1345 if basedir is None:
1346 basedir = self.controldir()
1347 path = path.lstrip(os.path.sep)
1348 try:
1349 return open(os.path.join(basedir, path), "rb")
1350 except FileNotFoundError:
1351 return None
1353 def index_path(self):
1354 """Return path to the index file."""
1355 return os.path.join(self.controldir(), INDEX_FILENAME)
1357 def open_index(self) -> "Index":
1358 """Open the index for this repository.
1360 Raises:
1361 NoIndexPresent: If no index is present
1362 Returns: The matching `Index`
1363 """
1364 from .index import Index
1366 if not self.has_index():
1367 raise NoIndexPresent
1368 return Index(self.index_path())
1370 def has_index(self):
1371 """Check if an index is present."""
1372 # Bare repos must never have index files; non-bare repos may have a
1373 # missing index file, which is treated as empty.
1374 return not self.bare
1376 def stage(
1377 self,
1378 fs_paths: Union[
1379 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1380 ],
1381 ) -> None:
1382 """Stage a set of paths.
1384 Args:
1385 fs_paths: List of paths, relative to the repository path
1386 """
1387 root_path_bytes = os.fsencode(self.path)
1389 if isinstance(fs_paths, (str, bytes, os.PathLike)):
1390 fs_paths = [fs_paths]
1391 fs_paths = list(fs_paths)
1393 from .index import (
1394 _fs_to_tree_path,
1395 blob_from_path_and_stat,
1396 index_entry_from_directory,
1397 index_entry_from_stat,
1398 )
1400 index = self.open_index()
1401 blob_normalizer = self.get_blob_normalizer()
1402 for fs_path in fs_paths:
1403 if not isinstance(fs_path, bytes):
1404 fs_path = os.fsencode(fs_path)
1405 if os.path.isabs(fs_path):
1406 raise ValueError(
1407 f"path {fs_path!r} should be relative to "
1408 "repository root, not absolute"
1409 )
1410 tree_path = _fs_to_tree_path(fs_path)
1411 full_path = os.path.join(root_path_bytes, fs_path)
1412 try:
1413 st = os.lstat(full_path)
1414 except OSError:
1415 # File no longer exists
1416 try:
1417 del index[tree_path]
1418 except KeyError:
1419 pass # already removed
1420 else:
1421 if stat.S_ISDIR(st.st_mode):
1422 entry = index_entry_from_directory(st, full_path)
1423 if entry:
1424 index[tree_path] = entry
1425 else:
1426 try:
1427 del index[tree_path]
1428 except KeyError:
1429 pass
1430 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
1431 try:
1432 del index[tree_path]
1433 except KeyError:
1434 pass
1435 else:
1436 blob = blob_from_path_and_stat(full_path, st)
1437 blob = blob_normalizer.checkin_normalize(blob, fs_path)
1438 self.object_store.add_object(blob)
1439 index[tree_path] = index_entry_from_stat(st, blob.id)
1440 index.write()
1442 def unstage(self, fs_paths: List[str]):
1443 """Unstage specific file in the index
1444 Args:
1445 fs_paths: a list of files to unstage,
1446 relative to the repository path.
1447 """
1448 from .index import IndexEntry, _fs_to_tree_path
1450 index = self.open_index()
1451 try:
1452 tree_id = self[b"HEAD"].tree
1453 except KeyError:
1454 # no head mean no commit in the repo
1455 for fs_path in fs_paths:
1456 tree_path = _fs_to_tree_path(fs_path)
1457 del index[tree_path]
1458 index.write()
1459 return
1461 for fs_path in fs_paths:
1462 tree_path = _fs_to_tree_path(fs_path)
1463 try:
1464 tree = self.object_store[tree_id]
1465 assert isinstance(tree, Tree)
1466 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path)
1467 except KeyError:
1468 # if tree_entry didn't exist, this file was being added, so
1469 # remove index entry
1470 try:
1471 del index[tree_path]
1472 continue
1473 except KeyError as exc:
1474 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
1476 st = None
1477 try:
1478 st = os.lstat(os.path.join(self.path, fs_path))
1479 except FileNotFoundError:
1480 pass
1482 index_entry = IndexEntry(
1483 ctime=(self[b"HEAD"].commit_time, 0),
1484 mtime=(self[b"HEAD"].commit_time, 0),
1485 dev=st.st_dev if st else 0,
1486 ino=st.st_ino if st else 0,
1487 mode=tree_entry[0],
1488 uid=st.st_uid if st else 0,
1489 gid=st.st_gid if st else 0,
1490 size=len(self[tree_entry[1]].data),
1491 sha=tree_entry[1],
1492 )
1494 index[tree_path] = index_entry
1495 index.write()
1497 def clone(
1498 self,
1499 target_path,
1500 *,
1501 mkdir=True,
1502 bare=False,
1503 origin=b"origin",
1504 checkout=None,
1505 branch=None,
1506 progress=None,
1507 depth=None,
1508 symlinks=None,
1509 ) -> "Repo":
1510 """Clone this repository.
1512 Args:
1513 target_path: Target path
1514 mkdir: Create the target directory
1515 bare: Whether to create a bare repository
1516 checkout: Whether or not to check-out HEAD after cloning
1517 origin: Base name for refs in target repository
1518 cloned from this repository
1519 branch: Optional branch or tag to be used as HEAD in the new repository
1520 instead of this repository's HEAD.
1521 progress: Optional progress function
1522 depth: Depth at which to fetch
1523 symlinks: Symlinks setting (default to autodetect)
1524 Returns: Created repository as `Repo`
1525 """
1526 encoded_path = os.fsencode(self.path)
1528 if mkdir:
1529 os.mkdir(target_path)
1531 try:
1532 if not bare:
1533 target = Repo.init(target_path, symlinks=symlinks)
1534 if checkout is None:
1535 checkout = True
1536 else:
1537 if checkout:
1538 raise ValueError("checkout and bare are incompatible")
1539 target = Repo.init_bare(target_path)
1541 try:
1542 target_config = target.get_config()
1543 target_config.set((b"remote", origin), b"url", encoded_path)
1544 target_config.set(
1545 (b"remote", origin),
1546 b"fetch",
1547 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1548 )
1549 target_config.write_to_path()
1551 ref_message = b"clone: from " + encoded_path
1552 self.fetch(target, depth=depth)
1553 target.refs.import_refs(
1554 b"refs/remotes/" + origin,
1555 self.refs.as_dict(b"refs/heads"),
1556 message=ref_message,
1557 )
1558 target.refs.import_refs(
1559 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1560 )
1562 head_chain, origin_sha = self.refs.follow(b"HEAD")
1563 origin_head = head_chain[-1] if head_chain else None
1564 if origin_sha and not origin_head:
1565 # set detached HEAD
1566 target.refs[b"HEAD"] = origin_sha
1567 else:
1568 _set_origin_head(target.refs, origin, origin_head)
1569 head_ref = _set_default_branch(
1570 target.refs, origin, origin_head, branch, ref_message
1571 )
1573 # Update target head
1574 if head_ref:
1575 head = _set_head(target.refs, head_ref, ref_message)
1576 else:
1577 head = None
1579 if checkout and head is not None:
1580 target.reset_index()
1581 except BaseException:
1582 target.close()
1583 raise
1584 except BaseException:
1585 if mkdir:
1586 import shutil
1588 shutil.rmtree(target_path)
1589 raise
1590 return target
1592 def reset_index(self, tree: Optional[bytes] = None):
1593 """Reset the index back to a specific tree.
1595 Args:
1596 tree: Tree SHA to reset to, None for current HEAD tree.
1597 """
1598 from .index import (
1599 build_index_from_tree,
1600 symlink,
1601 validate_path_element_default,
1602 validate_path_element_ntfs,
1603 )
1605 if tree is None:
1606 head = self[b"HEAD"]
1607 if isinstance(head, Tag):
1608 _cls, obj = head.object
1609 head = self.get_object(obj)
1610 tree = head.tree
1611 config = self.get_config()
1612 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
1613 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
1614 validate_path_element = validate_path_element_ntfs
1615 else:
1616 validate_path_element = validate_path_element_default
1617 if config.get_boolean(b"core", b"symlinks", True):
1618 symlink_fn = symlink
1619 else:
1621 def symlink_fn(source, target): # type: ignore
1622 with open(
1623 target, "w" + ("b" if isinstance(source, bytes) else "")
1624 ) as f:
1625 f.write(source)
1627 return build_index_from_tree(
1628 self.path,
1629 self.index_path(),
1630 self.object_store,
1631 tree,
1632 honor_filemode=honor_filemode,
1633 validate_path_element=validate_path_element,
1634 symlink_fn=symlink_fn,
1635 )
1637 def get_worktree_config(self) -> "ConfigFile":
1638 from .config import ConfigFile
1640 path = os.path.join(self.commondir(), "config.worktree")
1641 try:
1642 return ConfigFile.from_path(path)
1643 except FileNotFoundError:
1644 cf = ConfigFile()
1645 cf.path = path
1646 return cf
1648 def get_config(self) -> "ConfigFile":
1649 """Retrieve the config object.
1651 Returns: `ConfigFile` object for the ``.git/config`` file.
1652 """
1653 from .config import ConfigFile
1655 path = os.path.join(self._commondir, "config")
1656 try:
1657 return ConfigFile.from_path(path)
1658 except FileNotFoundError:
1659 ret = ConfigFile()
1660 ret.path = path
1661 return ret
1663 def get_description(self):
1664 """Retrieve the description of this repository.
1666 Returns: A string describing the repository or None.
1667 """
1668 path = os.path.join(self._controldir, "description")
1669 try:
1670 with GitFile(path, "rb") as f:
1671 return f.read()
1672 except FileNotFoundError:
1673 return None
1675 def __repr__(self) -> str:
1676 return f"<Repo at {self.path!r}>"
1678 def set_description(self, description):
1679 """Set the description for this repository.
1681 Args:
1682 description: Text to set as description for this repository.
1683 """
1684 self._put_named_file("description", description)
1686 @classmethod
1687 def _init_maybe_bare(
1688 cls,
1689 path,
1690 controldir,
1691 bare,
1692 object_store=None,
1693 config=None,
1694 default_branch=None,
1695 symlinks: Optional[bool] = None,
1696 ):
1697 for d in BASE_DIRECTORIES:
1698 os.mkdir(os.path.join(controldir, *d))
1699 if object_store is None:
1700 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1701 ret = cls(path, bare=bare, object_store=object_store)
1702 if default_branch is None:
1703 if config is None:
1704 from .config import StackedConfig
1706 config = StackedConfig.default()
1707 try:
1708 default_branch = config.get("init", "defaultBranch")
1709 except KeyError:
1710 default_branch = DEFAULT_BRANCH
1711 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1712 ret._init_files(bare=bare, symlinks=symlinks)
1713 return ret
1715 @classmethod
1716 def init(
1717 cls,
1718 path: str,
1719 *,
1720 mkdir: bool = False,
1721 config=None,
1722 default_branch=None,
1723 symlinks: Optional[bool] = None,
1724 ) -> "Repo":
1725 """Create a new repository.
1727 Args:
1728 path: Path in which to create the repository
1729 mkdir: Whether to create the directory
1730 Returns: `Repo` instance
1731 """
1732 if mkdir:
1733 os.mkdir(path)
1734 controldir = os.path.join(path, CONTROLDIR)
1735 os.mkdir(controldir)
1736 _set_filesystem_hidden(controldir)
1737 return cls._init_maybe_bare(
1738 path,
1739 controldir,
1740 False,
1741 config=config,
1742 default_branch=default_branch,
1743 symlinks=symlinks,
1744 )
1746 @classmethod
1747 def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False):
1748 """Create a new working directory linked to a repository.
1750 Args:
1751 path: Path in which to create the working tree.
1752 main_repo: Main repository to reference
1753 identifier: Worktree identifier
1754 mkdir: Whether to create the directory
1755 Returns: `Repo` instance
1756 """
1757 if mkdir:
1758 os.mkdir(path)
1759 if identifier is None:
1760 identifier = os.path.basename(path)
1761 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES)
1762 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1763 gitdirfile = os.path.join(path, CONTROLDIR)
1764 with open(gitdirfile, "wb") as f:
1765 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1766 try:
1767 os.mkdir(main_worktreesdir)
1768 except FileExistsError:
1769 pass
1770 try:
1771 os.mkdir(worktree_controldir)
1772 except FileExistsError:
1773 pass
1774 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1775 f.write(os.fsencode(gitdirfile) + b"\n")
1776 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1777 f.write(b"../..\n")
1778 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1779 f.write(main_repo.head() + b"\n")
1780 r = cls(path)
1781 r.reset_index()
1782 return r
1784 @classmethod
1785 def init_bare(
1786 cls, path, *, mkdir=False, object_store=None, config=None, default_branch=None
1787 ):
1788 """Create a new bare repository.
1790 ``path`` should already exist and be an empty directory.
1792 Args:
1793 path: Path to create bare repository in
1794 Returns: a `Repo` instance
1795 """
1796 if mkdir:
1797 os.mkdir(path)
1798 return cls._init_maybe_bare(
1799 path,
1800 path,
1801 True,
1802 object_store=object_store,
1803 config=config,
1804 default_branch=default_branch,
1805 )
1807 create = init_bare
1809 def close(self):
1810 """Close any files opened by this repository."""
1811 self.object_store.close()
1813 def __enter__(self):
1814 return self
1816 def __exit__(self, exc_type, exc_val, exc_tb):
1817 self.close()
1819 def get_blob_normalizer(self):
1820 """Return a BlobNormalizer object."""
1821 # TODO Parse the git attributes files
1822 git_attributes = {}
1823 config_stack = self.get_config_stack()
1824 try:
1825 tree = self.object_store[self.refs[b"HEAD"]].tree
1826 return TreeBlobNormalizer(
1827 config_stack,
1828 git_attributes,
1829 self.object_store,
1830 tree,
1831 )
1832 except KeyError:
1833 return BlobNormalizer(config_stack, git_attributes)
1836class MemoryRepo(BaseRepo):
1837 """Repo that stores refs, objects, and named files in memory.
1839 MemoryRepos are always bare: they have no working tree and no index, since
1840 those have a stronger dependency on the filesystem.
1841 """
1843 def __init__(self) -> None:
1844 from .config import ConfigFile
1846 self._reflog: List[Any] = []
1847 refs_container = DictRefsContainer({}, logger=self._append_reflog)
1848 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore
1849 self._named_files: Dict[str, bytes] = {}
1850 self.bare = True
1851 self._config = ConfigFile()
1852 self._description = None
1854 def _append_reflog(self, *args):
1855 self._reflog.append(args)
1857 def set_description(self, description):
1858 self._description = description
1860 def get_description(self):
1861 return self._description
1863 def _determine_file_mode(self):
1864 """Probe the file-system to determine whether permissions can be trusted.
1866 Returns: True if permissions can be trusted, False otherwise.
1867 """
1868 return sys.platform != "win32"
1870 def _determine_symlinks(self):
1871 """Probe the file-system to determine whether permissions can be trusted.
1873 Returns: True if permissions can be trusted, False otherwise.
1874 """
1875 return sys.platform != "win32"
1877 def _put_named_file(self, path, contents):
1878 """Write a file to the control dir with the given name and contents.
1880 Args:
1881 path: The path to the file, relative to the control dir.
1882 contents: A string to write to the file.
1883 """
1884 self._named_files[path] = contents
1886 def _del_named_file(self, path):
1887 try:
1888 del self._named_files[path]
1889 except KeyError:
1890 pass
1892 def get_named_file(self, path, basedir=None):
1893 """Get a file from the control dir with a specific name.
1895 Although the filename should be interpreted as a filename relative to
1896 the control dir in a disk-baked Repo, the object returned need not be
1897 pointing to a file in that location.
1899 Args:
1900 path: The path to the file, relative to the control dir.
1901 Returns: An open file object, or None if the file does not exist.
1902 """
1903 contents = self._named_files.get(path, None)
1904 if contents is None:
1905 return None
1906 return BytesIO(contents)
1908 def open_index(self):
1909 """Fail to open index for this repo, since it is bare.
1911 Raises:
1912 NoIndexPresent: Raised when no index is present
1913 """
1914 raise NoIndexPresent
1916 def get_config(self):
1917 """Retrieve the config object.
1919 Returns: `ConfigFile` object.
1920 """
1921 return self._config
1923 @classmethod
1924 def init_bare(cls, objects, refs):
1925 """Create a new bare repository in memory.
1927 Args:
1928 objects: Objects for the new repository,
1929 as iterable
1930 refs: Refs as dictionary, mapping names
1931 to object SHA1s
1932 """
1933 ret = cls()
1934 for obj in objects:
1935 ret.object_store.add_object(obj)
1936 for refname, sha in refs.items():
1937 ret.refs.add_if_new(refname, sha)
1938 ret._init_files(bare=True)
1939 return ret