Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import types
27import warnings
28from collections.abc import Iterable, Iterator
29from contextlib import suppress
30from typing import (
31 IO,
32 TYPE_CHECKING,
33 Any,
34 BinaryIO,
35 Callable,
36 Optional,
37 TypeVar,
38 Union,
39 cast,
40)
42if TYPE_CHECKING:
43 from .file import _GitFile
45from .errors import PackedRefsException, RefFormatError
46from .file import GitFile, ensure_dir_exists
47from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
48from .pack import ObjectContainer
50Ref = bytes
52HEADREF = b"HEAD"
53SYMREF = b"ref: "
54LOCAL_BRANCH_PREFIX = b"refs/heads/"
55LOCAL_TAG_PREFIX = b"refs/tags/"
56LOCAL_REMOTE_PREFIX = b"refs/remotes/"
57LOCAL_NOTES_PREFIX = b"refs/notes/"
58BAD_REF_CHARS = set(b"\177 ~^:?*[")
59PEELED_TAG_SUFFIX = b"^{}"
61# For backwards compatibility
62ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
65class SymrefLoop(Exception):
66 """There is a loop between one or more symrefs."""
68 def __init__(self, ref: bytes, depth: int) -> None:
69 """Initialize SymrefLoop exception."""
70 self.ref = ref
71 self.depth = depth
74def parse_symref_value(contents: bytes) -> bytes:
75 """Parse a symref value.
77 Args:
78 contents: Contents to parse
79 Returns: Destination
80 """
81 if contents.startswith(SYMREF):
82 return contents[len(SYMREF) :].rstrip(b"\r\n")
83 raise ValueError(contents)
86def check_ref_format(refname: Ref) -> bool:
87 """Check if a refname is correctly formatted.
89 Implements all the same rules as git-check-ref-format[1].
91 [1]
92 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
94 Args:
95 refname: The refname to check
96 Returns: True if refname is valid, False otherwise
97 """
98 # These could be combined into one big expression, but are listed
99 # separately to parallel [1].
100 if b"/." in refname or refname.startswith(b"."):
101 return False
102 if b"/" not in refname:
103 return False
104 if b".." in refname:
105 return False
106 for i, c in enumerate(refname):
107 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
108 return False
109 if refname[-1] in b"/.":
110 return False
111 if refname.endswith(b".lock"):
112 return False
113 if b"@{" in refname:
114 return False
115 if b"\\" in refname:
116 return False
117 return True
120def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
121 """Parse a remote ref into remote name and branch name.
123 Args:
124 ref: Remote ref like b"refs/remotes/origin/main"
126 Returns:
127 Tuple of (remote_name, branch_name)
129 Raises:
130 ValueError: If ref is not a valid remote ref
131 """
132 if not ref.startswith(LOCAL_REMOTE_PREFIX):
133 raise ValueError(f"Not a remote ref: {ref!r}")
135 # Remove the prefix
136 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
138 # Split into remote name and branch name
139 parts = remainder.split(b"/", 1)
140 if len(parts) != 2:
141 raise ValueError(f"Invalid remote ref format: {ref!r}")
143 remote_name, branch_name = parts
144 return (remote_name, branch_name)
147class RefsContainer:
148 """A container for refs."""
150 def __init__(
151 self,
152 logger: Optional[
153 Callable[
154 [
155 bytes,
156 Optional[bytes],
157 Optional[bytes],
158 Optional[bytes],
159 Optional[int],
160 Optional[int],
161 Optional[bytes],
162 ],
163 None,
164 ]
165 ] = None,
166 ) -> None:
167 """Initialize RefsContainer with optional logger function."""
168 self._logger = logger
170 def _log(
171 self,
172 ref: bytes,
173 old_sha: Optional[bytes],
174 new_sha: Optional[bytes],
175 committer: Optional[bytes] = None,
176 timestamp: Optional[int] = None,
177 timezone: Optional[int] = None,
178 message: Optional[bytes] = None,
179 ) -> None:
180 if self._logger is None:
181 return
182 if message is None:
183 return
184 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
186 def set_symbolic_ref(
187 self,
188 name: bytes,
189 other: bytes,
190 committer: Optional[bytes] = None,
191 timestamp: Optional[int] = None,
192 timezone: Optional[int] = None,
193 message: Optional[bytes] = None,
194 ) -> None:
195 """Make a ref point at another ref.
197 Args:
198 name: Name of the ref to set
199 other: Name of the ref to point at
200 committer: Optional committer name/email
201 timestamp: Optional timestamp
202 timezone: Optional timezone
203 message: Optional message
204 """
205 raise NotImplementedError(self.set_symbolic_ref)
207 def get_packed_refs(self) -> dict[Ref, ObjectID]:
208 """Get contents of the packed-refs file.
210 Returns: Dictionary mapping ref names to SHA1s
212 Note: Will return an empty dictionary when no packed-refs file is
213 present.
214 """
215 raise NotImplementedError(self.get_packed_refs)
217 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
218 """Add the given refs as packed refs.
220 Args:
221 new_refs: A mapping of ref names to targets; if a target is None that
222 means remove the ref
223 """
224 raise NotImplementedError(self.add_packed_refs)
226 def get_peeled(self, name: bytes) -> Optional[ObjectID]:
227 """Return the cached peeled value of a ref, if available.
229 Args:
230 name: Name of the ref to peel
231 Returns: The peeled value of the ref. If the ref is known not point to
232 a tag, this will be the SHA the ref refers to. If the ref may point
233 to a tag, but no cached information is available, None is returned.
234 """
235 return None
237 def import_refs(
238 self,
239 base: Ref,
240 other: dict[Ref, ObjectID],
241 committer: Optional[bytes] = None,
242 timestamp: Optional[bytes] = None,
243 timezone: Optional[bytes] = None,
244 message: Optional[bytes] = None,
245 prune: bool = False,
246 ) -> None:
247 """Import refs from another repository.
249 Args:
250 base: Base ref to import into (e.g., b'refs/remotes/origin')
251 other: Dictionary of refs to import
252 committer: Optional committer for reflog
253 timestamp: Optional timestamp for reflog
254 timezone: Optional timezone for reflog
255 message: Optional message for reflog
256 prune: If True, remove refs not in other
257 """
258 if prune:
259 to_delete = set(self.subkeys(base))
260 else:
261 to_delete = set()
262 for name, value in other.items():
263 if value is None:
264 to_delete.add(name)
265 else:
266 self.set_if_equals(
267 b"/".join((base, name)), None, value, message=message
268 )
269 if to_delete:
270 try:
271 to_delete.remove(name)
272 except KeyError:
273 pass
274 for ref in to_delete:
275 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
277 def allkeys(self) -> set[Ref]:
278 """All refs present in this container."""
279 raise NotImplementedError(self.allkeys)
281 def __iter__(self) -> Iterator[Ref]:
282 """Iterate over all reference keys."""
283 return iter(self.allkeys())
285 def keys(self, base=None):
286 """Refs present in this container.
288 Args:
289 base: An optional base to return refs under.
290 Returns: An unsorted set of valid refs in this container, including
291 packed refs.
292 """
293 if base is not None:
294 return self.subkeys(base)
295 else:
296 return self.allkeys()
298 def subkeys(self, base: bytes) -> set[bytes]:
299 """Refs present in this container under a base.
301 Args:
302 base: The base to return refs under.
303 Returns: A set of valid refs in this container under the base; the base
304 prefix is stripped from the ref names returned.
305 """
306 keys = set()
307 base_len = len(base) + 1
308 for refname in self.allkeys():
309 if refname.startswith(base):
310 keys.add(refname[base_len:])
311 return keys
313 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
314 """Return the contents of this container as a dictionary."""
315 ret = {}
316 keys = self.keys(base)
317 if base is None:
318 base = b""
319 else:
320 base = base.rstrip(b"/")
321 for key in keys:
322 try:
323 ret[key] = self[(base + b"/" + key).strip(b"/")]
324 except (SymrefLoop, KeyError):
325 continue # Unable to resolve
327 return ret
329 def _check_refname(self, name: bytes) -> None:
330 """Ensure a refname is valid and lives in refs or is HEAD.
332 HEAD is not a valid refname according to git-check-ref-format, but this
333 class needs to be able to touch HEAD. Also, check_ref_format expects
334 refnames without the leading 'refs/', but this class requires that
335 so it cannot touch anything outside the refs dir (or HEAD).
337 Args:
338 name: The name of the reference.
340 Raises:
341 KeyError: if a refname is not HEAD or is otherwise not valid.
342 """
343 if name in (HEADREF, b"refs/stash"):
344 return
345 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
346 raise RefFormatError(name)
348 def read_ref(self, refname: bytes) -> Optional[bytes]:
349 """Read a reference without following any references.
351 Args:
352 refname: The name of the reference
353 Returns: The contents of the ref file, or None if it does
354 not exist.
355 """
356 contents = self.read_loose_ref(refname)
357 if not contents:
358 contents = self.get_packed_refs().get(refname, None)
359 return contents
361 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
362 """Read a loose reference and return its contents.
364 Args:
365 name: the refname to read
366 Returns: The contents of the ref file, or None if it does
367 not exist.
368 """
369 raise NotImplementedError(self.read_loose_ref)
371 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
372 """Follow a reference name.
374 Returns: a tuple of (refnames, sha), wheres refnames are the names of
375 references in the chain
376 """
377 contents: Optional[bytes] = SYMREF + name
378 depth = 0
379 refnames = []
380 while contents and contents.startswith(SYMREF):
381 refname = contents[len(SYMREF) :]
382 refnames.append(refname)
383 contents = self.read_ref(refname)
384 if not contents:
385 break
386 depth += 1
387 if depth > 5:
388 raise SymrefLoop(name, depth)
389 return refnames, contents
391 def __contains__(self, refname: bytes) -> bool:
392 """Check if a reference exists."""
393 if self.read_ref(refname):
394 return True
395 return False
397 def __getitem__(self, name: bytes) -> ObjectID:
398 """Get the SHA1 for a reference name.
400 This method follows all symbolic references.
401 """
402 _, sha = self.follow(name)
403 if sha is None:
404 raise KeyError(name)
405 return sha
407 def set_if_equals(
408 self,
409 name: bytes,
410 old_ref: Optional[bytes],
411 new_ref: bytes,
412 committer: Optional[bytes] = None,
413 timestamp: Optional[int] = None,
414 timezone: Optional[int] = None,
415 message: Optional[bytes] = None,
416 ) -> bool:
417 """Set a refname to new_ref only if it currently equals old_ref.
419 This method follows all symbolic references if applicable for the
420 subclass, and can be used to perform an atomic compare-and-swap
421 operation.
423 Args:
424 name: The refname to set.
425 old_ref: The old sha the refname must refer to, or None to set
426 unconditionally.
427 new_ref: The new sha the refname will refer to.
428 committer: Optional committer name/email
429 timestamp: Optional timestamp
430 timezone: Optional timezone
431 message: Message for reflog
432 Returns: True if the set was successful, False otherwise.
433 """
434 raise NotImplementedError(self.set_if_equals)
436 def add_if_new(
437 self,
438 name: bytes,
439 ref: bytes,
440 committer: Optional[bytes] = None,
441 timestamp: Optional[int] = None,
442 timezone: Optional[int] = None,
443 message: Optional[bytes] = None,
444 ) -> bool:
445 """Add a new reference only if it does not already exist.
447 Args:
448 name: Ref name
449 ref: Ref value
450 committer: Optional committer name/email
451 timestamp: Optional timestamp
452 timezone: Optional timezone
453 message: Optional message for reflog
454 """
455 raise NotImplementedError(self.add_if_new)
457 def __setitem__(self, name: bytes, ref: bytes) -> None:
458 """Set a reference name to point to the given SHA1.
460 This method follows all symbolic references if applicable for the
461 subclass.
463 Note: This method unconditionally overwrites the contents of a
464 reference. To update atomically only if the reference has not
465 changed, use set_if_equals().
467 Args:
468 name: The refname to set.
469 ref: The new sha the refname will refer to.
470 """
471 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
472 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
473 self.set_if_equals(name, None, ref)
475 def remove_if_equals(
476 self,
477 name: bytes,
478 old_ref: Optional[bytes],
479 committer: Optional[bytes] = None,
480 timestamp: Optional[int] = None,
481 timezone: Optional[int] = None,
482 message: Optional[bytes] = None,
483 ) -> bool:
484 """Remove a refname only if it currently equals old_ref.
486 This method does not follow symbolic references, even if applicable for
487 the subclass. It can be used to perform an atomic compare-and-delete
488 operation.
490 Args:
491 name: The refname to delete.
492 old_ref: The old sha the refname must refer to, or None to
493 delete unconditionally.
494 committer: Optional committer name/email
495 timestamp: Optional timestamp
496 timezone: Optional timezone
497 message: Message for reflog
498 Returns: True if the delete was successful, False otherwise.
499 """
500 raise NotImplementedError(self.remove_if_equals)
502 def __delitem__(self, name: bytes) -> None:
503 """Remove a refname.
505 This method does not follow symbolic references, even if applicable for
506 the subclass.
508 Note: This method unconditionally deletes the contents of a reference.
509 To delete atomically only if the reference has not changed, use
510 remove_if_equals().
512 Args:
513 name: The refname to delete.
514 """
515 self.remove_if_equals(name, None)
517 def get_symrefs(self) -> dict[bytes, bytes]:
518 """Get a dict with all symrefs in this container.
520 Returns: Dictionary mapping source ref to target ref
521 """
522 ret = {}
523 for src in self.allkeys():
524 try:
525 ref_value = self.read_ref(src)
526 assert ref_value is not None
527 dst = parse_symref_value(ref_value)
528 except ValueError:
529 pass
530 else:
531 ret[src] = dst
532 return ret
534 def pack_refs(self, all: bool = False) -> None:
535 """Pack loose refs into packed-refs file.
537 Args:
538 all: If True, pack all refs. If False, only pack tags.
539 """
540 raise NotImplementedError(self.pack_refs)
543class DictRefsContainer(RefsContainer):
544 """RefsContainer backed by a simple dict.
546 This container does not support symbolic or packed references and is not
547 threadsafe.
548 """
550 def __init__(
551 self,
552 refs: dict[bytes, bytes],
553 logger: Optional[
554 Callable[
555 [
556 bytes,
557 Optional[bytes],
558 Optional[bytes],
559 Optional[bytes],
560 Optional[int],
561 Optional[int],
562 Optional[bytes],
563 ],
564 None,
565 ]
566 ] = None,
567 ) -> None:
568 """Initialize DictRefsContainer with refs dictionary and optional logger."""
569 super().__init__(logger=logger)
570 self._refs = refs
571 self._peeled: dict[bytes, ObjectID] = {}
572 self._watchers: set[Any] = set()
574 def allkeys(self) -> set[bytes]:
575 """Return all reference keys."""
576 return set(self._refs.keys())
578 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
579 """Read a loose reference."""
580 return self._refs.get(name, None)
582 def get_packed_refs(self) -> dict[bytes, bytes]:
583 """Get packed references."""
584 return {}
586 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
587 for watcher in self._watchers:
588 watcher._notify((ref, newsha))
590 def set_symbolic_ref(
591 self,
592 name: Ref,
593 other: Ref,
594 committer: Optional[bytes] = None,
595 timestamp: Optional[int] = None,
596 timezone: Optional[int] = None,
597 message: Optional[bytes] = None,
598 ) -> None:
599 """Make a ref point at another ref.
601 Args:
602 name: Name of the ref to set
603 other: Name of the ref to point at
604 committer: Optional committer name for reflog
605 timestamp: Optional timestamp for reflog
606 timezone: Optional timezone for reflog
607 message: Optional message for reflog
608 """
609 old = self.follow(name)[-1]
610 new = SYMREF + other
611 self._refs[name] = new
612 self._notify(name, new)
613 self._log(
614 name,
615 old,
616 new,
617 committer=committer,
618 timestamp=timestamp,
619 timezone=timezone,
620 message=message,
621 )
623 def set_if_equals(
624 self,
625 name: bytes,
626 old_ref: Optional[bytes],
627 new_ref: bytes,
628 committer: Optional[bytes] = None,
629 timestamp: Optional[int] = None,
630 timezone: Optional[int] = None,
631 message: Optional[bytes] = None,
632 ) -> bool:
633 """Set a refname to new_ref only if it currently equals old_ref.
635 This method follows all symbolic references, and can be used to perform
636 an atomic compare-and-swap operation.
638 Args:
639 name: The refname to set.
640 old_ref: The old sha the refname must refer to, or None to set
641 unconditionally.
642 new_ref: The new sha the refname will refer to.
643 committer: Optional committer name for reflog
644 timestamp: Optional timestamp for reflog
645 timezone: Optional timezone for reflog
646 message: Optional message for reflog
648 Returns:
649 True if the set was successful, False otherwise.
650 """
651 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
652 return False
653 # Only update the specific ref requested, not the whole chain
654 self._check_refname(name)
655 old = self._refs.get(name)
656 self._refs[name] = new_ref
657 self._notify(name, new_ref)
658 self._log(
659 name,
660 old,
661 new_ref,
662 committer=committer,
663 timestamp=timestamp,
664 timezone=timezone,
665 message=message,
666 )
667 return True
669 def add_if_new(
670 self,
671 name: Ref,
672 ref: ObjectID,
673 committer: Optional[bytes] = None,
674 timestamp: Optional[int] = None,
675 timezone: Optional[int] = None,
676 message: Optional[bytes] = None,
677 ) -> bool:
678 """Add a new reference only if it does not already exist.
680 Args:
681 name: Ref name
682 ref: Ref value
683 committer: Optional committer name for reflog
684 timestamp: Optional timestamp for reflog
685 timezone: Optional timezone for reflog
686 message: Optional message for reflog
688 Returns:
689 True if the add was successful, False otherwise.
690 """
691 if name in self._refs:
692 return False
693 self._refs[name] = ref
694 self._notify(name, ref)
695 self._log(
696 name,
697 None,
698 ref,
699 committer=committer,
700 timestamp=timestamp,
701 timezone=timezone,
702 message=message,
703 )
704 return True
706 def remove_if_equals(
707 self,
708 name: bytes,
709 old_ref: Optional[bytes],
710 committer: Optional[bytes] = None,
711 timestamp: Optional[int] = None,
712 timezone: Optional[int] = None,
713 message: Optional[bytes] = None,
714 ) -> bool:
715 """Remove a refname only if it currently equals old_ref.
717 This method does not follow symbolic references. It can be used to
718 perform an atomic compare-and-delete operation.
720 Args:
721 name: The refname to delete.
722 old_ref: The old sha the refname must refer to, or None to
723 delete unconditionally.
724 committer: Optional committer name for reflog
725 timestamp: Optional timestamp for reflog
726 timezone: Optional timezone for reflog
727 message: Optional message for reflog
729 Returns:
730 True if the delete was successful, False otherwise.
731 """
732 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
733 return False
734 try:
735 old = self._refs.pop(name)
736 except KeyError:
737 pass
738 else:
739 self._notify(name, None)
740 self._log(
741 name,
742 old,
743 None,
744 committer=committer,
745 timestamp=timestamp,
746 timezone=timezone,
747 message=message,
748 )
749 return True
751 def get_peeled(self, name: bytes) -> Optional[bytes]:
752 """Get peeled version of a reference."""
753 return self._peeled.get(name)
755 def _update(self, refs: dict[bytes, bytes]) -> None:
756 """Update multiple refs; intended only for testing."""
757 # TODO(dborowitz): replace this with a public function that uses
758 # set_if_equal.
759 for ref, sha in refs.items():
760 self.set_if_equals(ref, None, sha)
762 def _update_peeled(self, peeled: dict[bytes, bytes]) -> None:
763 """Update cached peeled refs; intended only for testing."""
764 self._peeled.update(peeled)
767class InfoRefsContainer(RefsContainer):
768 """Refs container that reads refs from a info/refs file."""
770 def __init__(self, f: BinaryIO) -> None:
771 """Initialize InfoRefsContainer from info/refs file."""
772 self._refs: dict[bytes, bytes] = {}
773 self._peeled: dict[bytes, bytes] = {}
774 refs = read_info_refs(f)
775 (self._refs, self._peeled) = split_peeled_refs(refs)
777 def allkeys(self) -> set[bytes]:
778 """Return all reference keys."""
779 return set(self._refs.keys())
781 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
782 """Read a loose reference."""
783 return self._refs.get(name, None)
785 def get_packed_refs(self) -> dict[bytes, bytes]:
786 """Get packed references."""
787 return {}
789 def get_peeled(self, name: bytes) -> Optional[bytes]:
790 """Get peeled version of a reference."""
791 try:
792 return self._peeled[name]
793 except KeyError:
794 return self._refs[name]
797class DiskRefsContainer(RefsContainer):
798 """Refs container that reads refs from disk."""
800 def __init__(
801 self,
802 path: Union[str, bytes, os.PathLike],
803 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
804 logger: Optional[
805 Callable[
806 [
807 bytes,
808 Optional[bytes],
809 Optional[bytes],
810 Optional[bytes],
811 Optional[int],
812 Optional[int],
813 Optional[bytes],
814 ],
815 None,
816 ]
817 ] = None,
818 ) -> None:
819 """Initialize DiskRefsContainer."""
820 super().__init__(logger=logger)
821 # Convert path-like objects to strings, then to bytes for Git compatibility
822 self.path = os.fsencode(os.fspath(path))
823 if worktree_path is None:
824 self.worktree_path = self.path
825 else:
826 self.worktree_path = os.fsencode(os.fspath(worktree_path))
827 self._packed_refs: Optional[dict[bytes, bytes]] = None
828 self._peeled_refs: Optional[dict[bytes, bytes]] = None
830 def __repr__(self) -> str:
831 """Return string representation of DiskRefsContainer."""
832 return f"{self.__class__.__name__}({self.path!r})"
834 def _iter_dir(
835 self,
836 path: bytes,
837 base: bytes,
838 dir_filter: Optional[Callable[[bytes], bool]] = None,
839 ) -> Iterator[bytes]:
840 refspath = os.path.join(path, base.rstrip(b"/"))
841 prefix_len = len(os.path.join(path, b""))
843 for root, dirs, files in os.walk(refspath):
844 directory = root[prefix_len:]
845 if os.path.sep != "/":
846 directory = directory.replace(os.fsencode(os.path.sep), b"/")
847 if dir_filter is not None:
848 dirs[:] = [
849 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
850 ]
852 for filename in files:
853 refname = b"/".join([directory, filename])
854 if check_ref_format(refname):
855 yield refname
857 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
858 base = base.rstrip(b"/") + b"/"
859 search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = []
860 if base != b"refs/":
861 path = self.worktree_path if is_per_worktree_ref(base) else self.path
862 search_paths.append((path, None))
863 elif self.worktree_path == self.path:
864 # Iterate through all the refs from the main worktree
865 search_paths.append((self.path, None))
866 else:
867 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
868 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
869 # Iterate through all the per-worktree refs from the worktree's gitdir
870 search_paths.append((self.worktree_path, is_per_worktree_ref))
872 for path, dir_filter in search_paths:
873 yield from self._iter_dir(path, base, dir_filter=dir_filter)
875 def subkeys(self, base: bytes) -> set[bytes]:
876 """Return subkeys under a given base reference path."""
877 subkeys = set()
879 for key in self._iter_loose_refs(base):
880 if key.startswith(base):
881 subkeys.add(key[len(base) :].strip(b"/"))
883 for key in self.get_packed_refs():
884 if key.startswith(base):
885 subkeys.add(key[len(base) :].strip(b"/"))
886 return subkeys
888 def allkeys(self) -> set[bytes]:
889 """Return all reference keys."""
890 allkeys = set()
891 if os.path.exists(self.refpath(HEADREF)):
892 allkeys.add(HEADREF)
894 allkeys.update(self._iter_loose_refs())
895 allkeys.update(self.get_packed_refs())
896 return allkeys
898 def refpath(self, name: bytes) -> bytes:
899 """Return the disk path of a ref."""
900 path = name
901 if os.path.sep != "/":
902 path = path.replace(b"/", os.fsencode(os.path.sep))
904 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
905 return os.path.join(root_dir, path)
907 def get_packed_refs(self) -> dict[bytes, bytes]:
908 """Get contents of the packed-refs file.
910 Returns: Dictionary mapping ref names to SHA1s
912 Note: Will return an empty dictionary when no packed-refs file is
913 present.
914 """
915 # TODO: invalidate the cache on repacking
916 if self._packed_refs is None:
917 # set both to empty because we want _peeled_refs to be
918 # None if and only if _packed_refs is also None.
919 self._packed_refs = {}
920 self._peeled_refs = {}
921 path = os.path.join(self.path, b"packed-refs")
922 try:
923 f = GitFile(path, "rb")
924 except FileNotFoundError:
925 return {}
926 with f:
927 first_line = next(iter(f)).rstrip()
928 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
929 for sha, name, peeled in read_packed_refs_with_peeled(f):
930 self._packed_refs[name] = sha
931 if peeled:
932 self._peeled_refs[name] = peeled
933 else:
934 f.seek(0)
935 for sha, name in read_packed_refs(f):
936 self._packed_refs[name] = sha
937 return self._packed_refs
939 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
940 """Add the given refs as packed refs.
942 Args:
943 new_refs: A mapping of ref names to targets; if a target is None that
944 means remove the ref
945 """
946 if not new_refs:
947 return
949 path = os.path.join(self.path, b"packed-refs")
951 with GitFile(path, "wb") as f:
952 # reread cached refs from disk, while holding the lock
953 packed_refs = self.get_packed_refs().copy()
955 for ref, target in new_refs.items():
956 # sanity check
957 if ref == HEADREF:
958 raise ValueError("cannot pack HEAD")
960 # remove any loose refs pointing to this one -- please
961 # note that this bypasses remove_if_equals as we don't
962 # want to affect packed refs in here
963 with suppress(OSError):
964 os.remove(self.refpath(ref))
966 if target is not None:
967 packed_refs[ref] = target
968 else:
969 packed_refs.pop(ref, None)
971 write_packed_refs(f, packed_refs, self._peeled_refs)
973 self._packed_refs = packed_refs
975 def get_peeled(self, name: bytes) -> Optional[bytes]:
976 """Return the cached peeled value of a ref, if available.
978 Args:
979 name: Name of the ref to peel
980 Returns: The peeled value of the ref. If the ref is known not point to
981 a tag, this will be the SHA the ref refers to. If the ref may point
982 to a tag, but no cached information is available, None is returned.
983 """
984 self.get_packed_refs()
985 if (
986 self._peeled_refs is None
987 or self._packed_refs is None
988 or name not in self._packed_refs
989 ):
990 # No cache: no peeled refs were read, or this ref is loose
991 return None
992 if name in self._peeled_refs:
993 return self._peeled_refs[name]
994 else:
995 # Known not peelable
996 return self[name]
998 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
999 """Read a reference file and return its contents.
1001 If the reference file a symbolic reference, only read the first line of
1002 the file. Otherwise, only read the first 40 bytes.
1004 Args:
1005 name: the refname to read, relative to refpath
1006 Returns: The contents of the ref file, or None if the file does not
1007 exist.
1009 Raises:
1010 IOError: if any other error occurs
1011 """
1012 filename = self.refpath(name)
1013 try:
1014 with GitFile(filename, "rb") as f:
1015 header = f.read(len(SYMREF))
1016 if header == SYMREF:
1017 # Read only the first line
1018 return header + next(iter(f)).rstrip(b"\r\n")
1019 else:
1020 # Read only the first 40 bytes
1021 return header + f.read(40 - len(SYMREF))
1022 except (OSError, UnicodeError):
1023 # don't assume anything specific about the error; in
1024 # particular, invalid or forbidden paths can raise weird
1025 # errors depending on the specific operating system
1026 return None
1028 def _remove_packed_ref(self, name: bytes) -> None:
1029 if self._packed_refs is None:
1030 return
1031 filename = os.path.join(self.path, b"packed-refs")
1032 # reread cached refs from disk, while holding the lock
1033 f = GitFile(filename, "wb")
1034 try:
1035 self._packed_refs = None
1036 self.get_packed_refs()
1038 if self._packed_refs is None or name not in self._packed_refs:
1039 f.abort()
1040 return
1042 del self._packed_refs[name]
1043 if self._peeled_refs is not None:
1044 with suppress(KeyError):
1045 del self._peeled_refs[name]
1046 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1047 f.close()
1048 except BaseException:
1049 f.abort()
1050 raise
1052 def set_symbolic_ref(
1053 self,
1054 name: bytes,
1055 other: bytes,
1056 committer: Optional[bytes] = None,
1057 timestamp: Optional[int] = None,
1058 timezone: Optional[int] = None,
1059 message: Optional[bytes] = None,
1060 ) -> None:
1061 """Make a ref point at another ref.
1063 Args:
1064 name: Name of the ref to set
1065 other: Name of the ref to point at
1066 committer: Optional committer name
1067 timestamp: Optional timestamp
1068 timezone: Optional timezone
1069 message: Optional message to describe the change
1070 """
1071 self._check_refname(name)
1072 self._check_refname(other)
1073 filename = self.refpath(name)
1074 f = GitFile(filename, "wb")
1075 try:
1076 f.write(SYMREF + other + b"\n")
1077 sha = self.follow(name)[-1]
1078 self._log(
1079 name,
1080 sha,
1081 sha,
1082 committer=committer,
1083 timestamp=timestamp,
1084 timezone=timezone,
1085 message=message,
1086 )
1087 except BaseException:
1088 f.abort()
1089 raise
1090 else:
1091 f.close()
1093 def set_if_equals(
1094 self,
1095 name: bytes,
1096 old_ref: Optional[bytes],
1097 new_ref: bytes,
1098 committer: Optional[bytes] = None,
1099 timestamp: Optional[int] = None,
1100 timezone: Optional[int] = None,
1101 message: Optional[bytes] = None,
1102 ) -> bool:
1103 """Set a refname to new_ref only if it currently equals old_ref.
1105 This method follows all symbolic references, and can be used to perform
1106 an atomic compare-and-swap operation.
1108 Args:
1109 name: The refname to set.
1110 old_ref: The old sha the refname must refer to, or None to set
1111 unconditionally.
1112 new_ref: The new sha the refname will refer to.
1113 committer: Optional committer name
1114 timestamp: Optional timestamp
1115 timezone: Optional timezone
1116 message: Set message for reflog
1117 Returns: True if the set was successful, False otherwise.
1118 """
1119 self._check_refname(name)
1120 try:
1121 realnames, _ = self.follow(name)
1122 realname = realnames[-1]
1123 except (KeyError, IndexError, SymrefLoop):
1124 realname = name
1125 filename = self.refpath(realname)
1127 # make sure none of the ancestor folders is in packed refs
1128 probe_ref = os.path.dirname(realname)
1129 packed_refs = self.get_packed_refs()
1130 while probe_ref:
1131 if packed_refs.get(probe_ref, None) is not None:
1132 raise NotADirectoryError(filename)
1133 probe_ref = os.path.dirname(probe_ref)
1135 ensure_dir_exists(os.path.dirname(filename))
1136 with GitFile(filename, "wb") as f:
1137 if old_ref is not None:
1138 try:
1139 # read again while holding the lock to handle race conditions
1140 orig_ref = self.read_loose_ref(realname)
1141 if orig_ref is None:
1142 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1143 if orig_ref != old_ref:
1144 f.abort()
1145 return False
1146 except OSError:
1147 f.abort()
1148 raise
1150 # Check if ref already has the desired value while holding the lock
1151 # This avoids fsync when ref is unchanged but still detects lock conflicts
1152 current_ref = self.read_loose_ref(realname)
1153 if current_ref is None:
1154 current_ref = packed_refs.get(realname, None)
1156 if current_ref is not None and current_ref == new_ref:
1157 # Ref already has desired value, abort write to avoid fsync
1158 f.abort()
1159 return True
1161 try:
1162 f.write(new_ref + b"\n")
1163 except OSError:
1164 f.abort()
1165 raise
1166 self._log(
1167 realname,
1168 old_ref,
1169 new_ref,
1170 committer=committer,
1171 timestamp=timestamp,
1172 timezone=timezone,
1173 message=message,
1174 )
1175 return True
1177 def add_if_new(
1178 self,
1179 name: bytes,
1180 ref: bytes,
1181 committer: Optional[bytes] = None,
1182 timestamp: Optional[int] = None,
1183 timezone: Optional[int] = None,
1184 message: Optional[bytes] = None,
1185 ) -> bool:
1186 """Add a new reference only if it does not already exist.
1188 This method follows symrefs, and only ensures that the last ref in the
1189 chain does not exist.
1191 Args:
1192 name: The refname to set.
1193 ref: The new sha the refname will refer to.
1194 committer: Optional committer name
1195 timestamp: Optional timestamp
1196 timezone: Optional timezone
1197 message: Optional message for reflog
1198 Returns: True if the add was successful, False otherwise.
1199 """
1200 try:
1201 realnames, contents = self.follow(name)
1202 if contents is not None:
1203 return False
1204 realname = realnames[-1]
1205 except (KeyError, IndexError):
1206 realname = name
1207 self._check_refname(realname)
1208 filename = self.refpath(realname)
1209 ensure_dir_exists(os.path.dirname(filename))
1210 with GitFile(filename, "wb") as f:
1211 if os.path.exists(filename) or name in self.get_packed_refs():
1212 f.abort()
1213 return False
1214 try:
1215 f.write(ref + b"\n")
1216 except OSError:
1217 f.abort()
1218 raise
1219 else:
1220 self._log(
1221 name,
1222 None,
1223 ref,
1224 committer=committer,
1225 timestamp=timestamp,
1226 timezone=timezone,
1227 message=message,
1228 )
1229 return True
1231 def remove_if_equals(
1232 self,
1233 name: bytes,
1234 old_ref: Optional[bytes],
1235 committer: Optional[bytes] = None,
1236 timestamp: Optional[int] = None,
1237 timezone: Optional[int] = None,
1238 message: Optional[bytes] = None,
1239 ) -> bool:
1240 """Remove a refname only if it currently equals old_ref.
1242 This method does not follow symbolic references. It can be used to
1243 perform an atomic compare-and-delete operation.
1245 Args:
1246 name: The refname to delete.
1247 old_ref: The old sha the refname must refer to, or None to
1248 delete unconditionally.
1249 committer: Optional committer name
1250 timestamp: Optional timestamp
1251 timezone: Optional timezone
1252 message: Optional message
1253 Returns: True if the delete was successful, False otherwise.
1254 """
1255 self._check_refname(name)
1256 filename = self.refpath(name)
1257 ensure_dir_exists(os.path.dirname(filename))
1258 f = GitFile(filename, "wb")
1259 try:
1260 if old_ref is not None:
1261 orig_ref = self.read_loose_ref(name)
1262 if orig_ref is None:
1263 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1264 if orig_ref != old_ref:
1265 return False
1267 # remove the reference file itself
1268 try:
1269 found = os.path.lexists(filename)
1270 except OSError:
1271 # may only be packed, or otherwise unstorable
1272 found = False
1274 if found:
1275 os.remove(filename)
1277 self._remove_packed_ref(name)
1278 self._log(
1279 name,
1280 old_ref,
1281 None,
1282 committer=committer,
1283 timestamp=timestamp,
1284 timezone=timezone,
1285 message=message,
1286 )
1287 finally:
1288 # never write, we just wanted the lock
1289 f.abort()
1291 # outside of the lock, clean-up any parent directory that might now
1292 # be empty. this ensures that re-creating a reference of the same
1293 # name of what was previously a directory works as expected
1294 parent = name
1295 while True:
1296 try:
1297 parent, _ = parent.rsplit(b"/", 1)
1298 except ValueError:
1299 break
1301 if parent == b"refs":
1302 break
1303 parent_filename = self.refpath(parent)
1304 try:
1305 os.rmdir(parent_filename)
1306 except OSError:
1307 # this can be caused by the parent directory being
1308 # removed by another process, being not empty, etc.
1309 # in any case, this is non fatal because we already
1310 # removed the reference, just ignore it
1311 break
1313 return True
1315 def pack_refs(self, all: bool = False) -> None:
1316 """Pack loose refs into packed-refs file.
1318 Args:
1319 all: If True, pack all refs. If False, only pack tags.
1320 """
1321 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1322 for ref in self.allkeys():
1323 if ref == HEADREF:
1324 # Never pack HEAD
1325 continue
1326 if all or ref.startswith(LOCAL_TAG_PREFIX):
1327 try:
1328 sha = self[ref]
1329 if sha:
1330 refs_to_pack[ref] = sha
1331 except KeyError:
1332 # Broken ref, skip it
1333 pass
1335 if refs_to_pack:
1336 self.add_packed_refs(refs_to_pack)
1339def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
1340 """Split a single ref line into a tuple of SHA1 and name."""
1341 fields = line.rstrip(b"\n\r").split(b" ")
1342 if len(fields) != 2:
1343 raise PackedRefsException(f"invalid ref line {line!r}")
1344 sha, name = fields
1345 if not valid_hexsha(sha):
1346 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1347 if not check_ref_format(name):
1348 raise PackedRefsException(f"invalid ref name {name!r}")
1349 return (sha, name)
1352def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
1353 """Read a packed refs file.
1355 Args:
1356 f: file-like object to read from
1357 Returns: Iterator over tuples with SHA1s and ref names.
1358 """
1359 for line in f:
1360 if line.startswith(b"#"):
1361 # Comment
1362 continue
1363 if line.startswith(b"^"):
1364 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1365 yield _split_ref_line(line)
1368def read_packed_refs_with_peeled(
1369 f: IO[bytes],
1370) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
1371 """Read a packed refs file including peeled refs.
1373 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1374 with ref names, SHA1s, and peeled SHA1s (or None).
1376 Args:
1377 f: file-like object to read from, seek'ed to the second line
1378 """
1379 last = None
1380 for line in f:
1381 if line[0] == b"#":
1382 continue
1383 line = line.rstrip(b"\r\n")
1384 if line.startswith(b"^"):
1385 if not last:
1386 raise PackedRefsException("unexpected peeled ref line")
1387 if not valid_hexsha(line[1:]):
1388 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1389 sha, name = _split_ref_line(last)
1390 last = None
1391 yield (sha, name, line[1:])
1392 else:
1393 if last:
1394 sha, name = _split_ref_line(last)
1395 yield (sha, name, None)
1396 last = line
1397 if last:
1398 sha, name = _split_ref_line(last)
1399 yield (sha, name, None)
1402def write_packed_refs(
1403 f: IO[bytes],
1404 packed_refs: dict[bytes, bytes],
1405 peeled_refs: Optional[dict[bytes, bytes]] = None,
1406) -> None:
1407 """Write a packed refs file.
1409 Args:
1410 f: empty file-like object to write to
1411 packed_refs: dict of refname to sha of packed refs to write
1412 peeled_refs: dict of refname to peeled value of sha
1413 """
1414 if peeled_refs is None:
1415 peeled_refs = {}
1416 else:
1417 f.write(b"# pack-refs with: peeled\n")
1418 for refname in sorted(packed_refs.keys()):
1419 f.write(git_line(packed_refs[refname], refname))
1420 if refname in peeled_refs:
1421 f.write(b"^" + peeled_refs[refname] + b"\n")
1424def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
1425 """Read info/refs file.
1427 Args:
1428 f: File-like object to read from
1430 Returns:
1431 Dictionary mapping ref names to SHA1s
1432 """
1433 ret = {}
1434 for line in f.readlines():
1435 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1436 ret[name] = sha
1437 return ret
1440def write_info_refs(
1441 refs: dict[bytes, bytes], store: ObjectContainer
1442) -> Iterator[bytes]:
1443 """Generate info refs."""
1444 # TODO: Avoid recursive import :(
1445 from .object_store import peel_sha
1447 for name, sha in sorted(refs.items()):
1448 # get_refs() includes HEAD as a special case, but we don't want to
1449 # advertise it
1450 if name == HEADREF:
1451 continue
1452 try:
1453 o = store[sha]
1454 except KeyError:
1455 continue
1456 unpeeled, peeled = peel_sha(store, sha)
1457 yield o.id + b"\t" + name + b"\n"
1458 if o.id != peeled.id:
1459 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1462def is_local_branch(x: bytes) -> bool:
1463 """Check if a ref name is a local branch."""
1464 return x.startswith(LOCAL_BRANCH_PREFIX)
1467T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
1470def strip_peeled_refs(refs: T) -> T:
1471 """Remove all peeled refs."""
1472 return {
1473 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1474 }
1477def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
1478 """Split peeled refs from regular refs."""
1479 peeled: dict[bytes, bytes] = {}
1480 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
1482 for ref, sha in refs.items():
1483 if ref.endswith(PEELED_TAG_SUFFIX):
1484 # Only add to peeled dict if sha is not None
1485 if sha is not None:
1486 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1488 return regular, peeled
1491def _set_origin_head(
1492 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
1493) -> None:
1494 # set refs/remotes/origin/HEAD
1495 origin_base = b"refs/remotes/" + origin + b"/"
1496 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1497 origin_ref = origin_base + HEADREF
1498 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1499 if target_ref in refs:
1500 refs.set_symbolic_ref(origin_ref, target_ref)
1503def _set_default_branch(
1504 refs: RefsContainer,
1505 origin: bytes,
1506 origin_head: Optional[bytes],
1507 branch: bytes,
1508 ref_message: Optional[bytes],
1509) -> bytes:
1510 """Set the default branch."""
1511 origin_base = b"refs/remotes/" + origin + b"/"
1512 if branch:
1513 origin_ref = origin_base + branch
1514 if origin_ref in refs:
1515 local_ref = LOCAL_BRANCH_PREFIX + branch
1516 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1517 head_ref = local_ref
1518 elif LOCAL_TAG_PREFIX + branch in refs:
1519 head_ref = LOCAL_TAG_PREFIX + branch
1520 else:
1521 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1522 elif origin_head:
1523 head_ref = origin_head
1524 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1525 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1526 else:
1527 origin_ref = origin_head
1528 try:
1529 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1530 except KeyError:
1531 pass
1532 else:
1533 raise ValueError("neither origin_head nor branch are provided")
1534 return head_ref
1537def _set_head(
1538 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
1539) -> Optional[bytes]:
1540 if head_ref.startswith(LOCAL_TAG_PREFIX):
1541 # detach HEAD at specified tag
1542 head = refs[head_ref]
1543 if isinstance(head, Tag):
1544 _cls, obj = head.object
1545 head = obj.get_object(obj).id
1546 del refs[HEADREF]
1547 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1548 else:
1549 # set HEAD to specific branch
1550 try:
1551 head = refs[head_ref]
1552 refs.set_symbolic_ref(HEADREF, head_ref)
1553 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1554 except KeyError:
1555 head = None
1556 return head
1559def _import_remote_refs(
1560 refs_container: RefsContainer,
1561 remote_name: str,
1562 refs: dict[bytes, Optional[bytes]],
1563 message: Optional[bytes] = None,
1564 prune: bool = False,
1565 prune_tags: bool = False,
1566) -> None:
1567 stripped_refs = strip_peeled_refs(refs)
1568 branches = {
1569 n[len(LOCAL_BRANCH_PREFIX) :]: v
1570 for (n, v) in stripped_refs.items()
1571 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
1572 }
1573 refs_container.import_refs(
1574 b"refs/remotes/" + remote_name.encode(),
1575 branches,
1576 message=message,
1577 prune=prune,
1578 )
1579 tags = {
1580 n[len(LOCAL_TAG_PREFIX) :]: v
1581 for (n, v) in stripped_refs.items()
1582 if n.startswith(LOCAL_TAG_PREFIX)
1583 and not n.endswith(PEELED_TAG_SUFFIX)
1584 and v is not None
1585 }
1586 refs_container.import_refs(
1587 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1588 )
1591def serialize_refs(
1592 store: ObjectContainer, refs: dict[bytes, bytes]
1593) -> dict[bytes, bytes]:
1594 """Serialize refs with peeled refs.
1596 Args:
1597 store: Object store to peel refs from
1598 refs: Dictionary of ref names to SHAs
1600 Returns:
1601 Dictionary with refs and peeled refs (marked with ^{})
1602 """
1603 # TODO: Avoid recursive import :(
1604 from .object_store import peel_sha
1606 ret = {}
1607 for ref, sha in refs.items():
1608 try:
1609 unpeeled, peeled = peel_sha(store, sha)
1610 except KeyError:
1611 warnings.warn(
1612 "ref {} points at non-present sha {}".format(
1613 ref.decode("utf-8", "replace"), sha.decode("ascii")
1614 ),
1615 UserWarning,
1616 )
1617 continue
1618 else:
1619 if isinstance(unpeeled, Tag):
1620 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1621 ret[ref] = unpeeled.id
1622 return ret
1625class locked_ref:
1626 """Lock a ref while making modifications.
1628 Works as a context manager.
1629 """
1631 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1632 """Initialize a locked ref.
1634 Args:
1635 refs_container: The DiskRefsContainer to lock the ref in
1636 refname: The ref name to lock
1637 """
1638 self._refs_container = refs_container
1639 self._refname = refname
1640 self._file: Optional[_GitFile] = None
1641 self._realname: Optional[Ref] = None
1642 self._deleted = False
1644 def __enter__(self) -> "locked_ref":
1645 """Enter the context manager and acquire the lock.
1647 Returns:
1648 This locked_ref instance
1650 Raises:
1651 OSError: If the lock cannot be acquired
1652 """
1653 self._refs_container._check_refname(self._refname)
1654 try:
1655 realnames, _ = self._refs_container.follow(self._refname)
1656 self._realname = realnames[-1]
1657 except (KeyError, IndexError, SymrefLoop):
1658 self._realname = self._refname
1660 filename = self._refs_container.refpath(self._realname)
1661 ensure_dir_exists(os.path.dirname(filename))
1662 f = GitFile(filename, "wb")
1663 self._file = f
1664 return self
1666 def __exit__(
1667 self,
1668 exc_type: Optional[type],
1669 exc_value: Optional[BaseException],
1670 traceback: Optional[types.TracebackType],
1671 ) -> None:
1672 """Exit the context manager and release the lock.
1674 Args:
1675 exc_type: Type of exception if one occurred
1676 exc_value: Exception instance if one occurred
1677 traceback: Traceback if an exception occurred
1678 """
1679 if self._file:
1680 if exc_type is not None or self._deleted:
1681 self._file.abort()
1682 else:
1683 self._file.close()
1685 def get(self) -> Optional[bytes]:
1686 """Get the current value of the ref."""
1687 if not self._file:
1688 raise RuntimeError("locked_ref not in context")
1690 assert self._realname is not None
1691 current_ref = self._refs_container.read_loose_ref(self._realname)
1692 if current_ref is None:
1693 current_ref = self._refs_container.get_packed_refs().get(
1694 self._realname, None
1695 )
1696 return current_ref
1698 def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
1699 """Ensure the ref currently equals the expected value.
1701 Args:
1702 expected_value: The expected current value of the ref
1703 Returns:
1704 True if the ref equals the expected value, False otherwise
1705 """
1706 current_value = self.get()
1707 return current_value == expected_value
1709 def set(self, new_ref: bytes) -> None:
1710 """Set the ref to a new value.
1712 Args:
1713 new_ref: The new SHA1 or symbolic ref value
1714 """
1715 if not self._file:
1716 raise RuntimeError("locked_ref not in context")
1718 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1719 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1721 self._file.seek(0)
1722 self._file.truncate()
1723 self._file.write(new_ref + b"\n")
1724 self._deleted = False
1726 def set_symbolic_ref(self, target: Ref) -> None:
1727 """Make this ref point at another ref.
1729 Args:
1730 target: Name of the ref to point at
1731 """
1732 if not self._file:
1733 raise RuntimeError("locked_ref not in context")
1735 self._refs_container._check_refname(target)
1736 self._file.seek(0)
1737 self._file.truncate()
1738 self._file.write(SYMREF + target + b"\n")
1739 self._deleted = False
1741 def delete(self) -> None:
1742 """Delete the ref file while holding the lock."""
1743 if not self._file:
1744 raise RuntimeError("locked_ref not in context")
1746 # Delete the actual ref file while holding the lock
1747 if self._realname:
1748 filename = self._refs_container.refpath(self._realname)
1749 try:
1750 if os.path.lexists(filename):
1751 os.remove(filename)
1752 except FileNotFoundError:
1753 pass
1754 self._refs_container._remove_packed_ref(self._realname)
1756 self._deleted = True
1759def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
1760 """Filter refs to only include those with a given prefix.
1762 Args:
1763 refs: A dictionary of refs.
1764 prefixes: The prefixes to filter by.
1765 """
1766 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
1767 return cast(T, filtered)
1770def is_per_worktree_ref(ref: bytes) -> bool:
1771 """Returns whether a reference is stored per worktree or not.
1773 Per-worktree references are:
1774 - all pseudorefs, e.g. HEAD
1775 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
1777 All refs starting with "refs/" are shared, except for the ones listed above.
1779 See https://git-scm.com/docs/git-worktree#_refs.
1780 """
1781 return not ref.startswith(b"refs/") or ref.startswith(
1782 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
1783 )