Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import types
27import warnings
28from collections.abc import Iterable, Iterator, Mapping
29from contextlib import suppress
30from typing import (
31 IO,
32 TYPE_CHECKING,
33 Any,
34 BinaryIO,
35 Callable,
36 Optional,
37 TypeVar,
38 Union,
39)
41if TYPE_CHECKING:
42 from .file import _GitFile
44from .errors import PackedRefsException, RefFormatError
45from .file import GitFile, ensure_dir_exists
46from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
47from .pack import ObjectContainer
49Ref = bytes
51HEADREF = b"HEAD"
52SYMREF = b"ref: "
53LOCAL_BRANCH_PREFIX = b"refs/heads/"
54LOCAL_TAG_PREFIX = b"refs/tags/"
55LOCAL_REMOTE_PREFIX = b"refs/remotes/"
56LOCAL_NOTES_PREFIX = b"refs/notes/"
57BAD_REF_CHARS = set(b"\177 ~^:?*[")
58PEELED_TAG_SUFFIX = b"^{}"
60# For backwards compatibility
61ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
64class SymrefLoop(Exception):
65 """There is a loop between one or more symrefs."""
67 def __init__(self, ref: bytes, depth: int) -> None:
68 """Initialize SymrefLoop exception."""
69 self.ref = ref
70 self.depth = depth
73def parse_symref_value(contents: bytes) -> bytes:
74 """Parse a symref value.
76 Args:
77 contents: Contents to parse
78 Returns: Destination
79 """
80 if contents.startswith(SYMREF):
81 return contents[len(SYMREF) :].rstrip(b"\r\n")
82 raise ValueError(contents)
85def check_ref_format(refname: Ref) -> bool:
86 """Check if a refname is correctly formatted.
88 Implements all the same rules as git-check-ref-format[1].
90 [1]
91 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
93 Args:
94 refname: The refname to check
95 Returns: True if refname is valid, False otherwise
96 """
97 # These could be combined into one big expression, but are listed
98 # separately to parallel [1].
99 if b"/." in refname or refname.startswith(b"."):
100 return False
101 if b"/" not in refname:
102 return False
103 if b".." in refname:
104 return False
105 for i, c in enumerate(refname):
106 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
107 return False
108 if refname[-1] in b"/.":
109 return False
110 if refname.endswith(b".lock"):
111 return False
112 if b"@{" in refname:
113 return False
114 if b"\\" in refname:
115 return False
116 return True
119def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
120 """Parse a remote ref into remote name and branch name.
122 Args:
123 ref: Remote ref like b"refs/remotes/origin/main"
125 Returns:
126 Tuple of (remote_name, branch_name)
128 Raises:
129 ValueError: If ref is not a valid remote ref
130 """
131 if not ref.startswith(LOCAL_REMOTE_PREFIX):
132 raise ValueError(f"Not a remote ref: {ref!r}")
134 # Remove the prefix
135 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
137 # Split into remote name and branch name
138 parts = remainder.split(b"/", 1)
139 if len(parts) != 2:
140 raise ValueError(f"Invalid remote ref format: {ref!r}")
142 remote_name, branch_name = parts
143 return (remote_name, branch_name)
146class RefsContainer:
147 """A container for refs."""
149 def __init__(
150 self,
151 logger: Optional[
152 Callable[
153 [
154 bytes,
155 bytes,
156 bytes,
157 Optional[bytes],
158 Optional[int],
159 Optional[int],
160 bytes,
161 ],
162 None,
163 ]
164 ] = None,
165 ) -> None:
166 """Initialize RefsContainer with optional logger function."""
167 self._logger = logger
169 def _log(
170 self,
171 ref: bytes,
172 old_sha: Optional[bytes],
173 new_sha: Optional[bytes],
174 committer: Optional[bytes] = None,
175 timestamp: Optional[int] = None,
176 timezone: Optional[int] = None,
177 message: Optional[bytes] = None,
178 ) -> None:
179 if self._logger is None:
180 return
181 if message is None:
182 return
183 # Use ZERO_SHA for None values, matching git behavior
184 if old_sha is None:
185 old_sha = ZERO_SHA
186 if new_sha is None:
187 new_sha = ZERO_SHA
188 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
190 def set_symbolic_ref(
191 self,
192 name: bytes,
193 other: bytes,
194 committer: Optional[bytes] = None,
195 timestamp: Optional[int] = None,
196 timezone: Optional[int] = None,
197 message: Optional[bytes] = None,
198 ) -> None:
199 """Make a ref point at another ref.
201 Args:
202 name: Name of the ref to set
203 other: Name of the ref to point at
204 committer: Optional committer name/email
205 timestamp: Optional timestamp
206 timezone: Optional timezone
207 message: Optional message
208 """
209 raise NotImplementedError(self.set_symbolic_ref)
211 def get_packed_refs(self) -> dict[Ref, ObjectID]:
212 """Get contents of the packed-refs file.
214 Returns: Dictionary mapping ref names to SHA1s
216 Note: Will return an empty dictionary when no packed-refs file is
217 present.
218 """
219 raise NotImplementedError(self.get_packed_refs)
221 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
222 """Add the given refs as packed refs.
224 Args:
225 new_refs: A mapping of ref names to targets; if a target is None that
226 means remove the ref
227 """
228 raise NotImplementedError(self.add_packed_refs)
230 def get_peeled(self, name: bytes) -> Optional[ObjectID]:
231 """Return the cached peeled value of a ref, if available.
233 Args:
234 name: Name of the ref to peel
235 Returns: The peeled value of the ref. If the ref is known not point to
236 a tag, this will be the SHA the ref refers to. If the ref may point
237 to a tag, but no cached information is available, None is returned.
238 """
239 return None
241 def import_refs(
242 self,
243 base: Ref,
244 other: Mapping[Ref, ObjectID],
245 committer: Optional[bytes] = None,
246 timestamp: Optional[bytes] = None,
247 timezone: Optional[bytes] = None,
248 message: Optional[bytes] = None,
249 prune: bool = False,
250 ) -> None:
251 """Import refs from another repository.
253 Args:
254 base: Base ref to import into (e.g., b'refs/remotes/origin')
255 other: Dictionary of refs to import
256 committer: Optional committer for reflog
257 timestamp: Optional timestamp for reflog
258 timezone: Optional timezone for reflog
259 message: Optional message for reflog
260 prune: If True, remove refs not in other
261 """
262 if prune:
263 to_delete = set(self.subkeys(base))
264 else:
265 to_delete = set()
266 for name, value in other.items():
267 if value is None:
268 to_delete.add(name)
269 else:
270 self.set_if_equals(
271 b"/".join((base, name)), None, value, message=message
272 )
273 if to_delete:
274 try:
275 to_delete.remove(name)
276 except KeyError:
277 pass
278 for ref in to_delete:
279 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
281 def allkeys(self) -> set[Ref]:
282 """All refs present in this container."""
283 raise NotImplementedError(self.allkeys)
285 def __iter__(self) -> Iterator[Ref]:
286 """Iterate over all reference keys."""
287 return iter(self.allkeys())
289 def keys(self, base: Optional[bytes] = None) -> set[bytes]:
290 """Refs present in this container.
292 Args:
293 base: An optional base to return refs under.
294 Returns: An unsorted set of valid refs in this container, including
295 packed refs.
296 """
297 if base is not None:
298 return self.subkeys(base)
299 else:
300 return self.allkeys()
302 def subkeys(self, base: bytes) -> set[bytes]:
303 """Refs present in this container under a base.
305 Args:
306 base: The base to return refs under.
307 Returns: A set of valid refs in this container under the base; the base
308 prefix is stripped from the ref names returned.
309 """
310 keys = set()
311 base_len = len(base) + 1
312 for refname in self.allkeys():
313 if refname.startswith(base):
314 keys.add(refname[base_len:])
315 return keys
317 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
318 """Return the contents of this container as a dictionary."""
319 ret = {}
320 keys = self.keys(base)
321 if base is None:
322 base = b""
323 else:
324 base = base.rstrip(b"/")
325 for key in keys:
326 try:
327 ret[key] = self[(base + b"/" + key).strip(b"/")]
328 except (SymrefLoop, KeyError):
329 continue # Unable to resolve
331 return ret
333 def _check_refname(self, name: bytes) -> None:
334 """Ensure a refname is valid and lives in refs or is HEAD.
336 HEAD is not a valid refname according to git-check-ref-format, but this
337 class needs to be able to touch HEAD. Also, check_ref_format expects
338 refnames without the leading 'refs/', but this class requires that
339 so it cannot touch anything outside the refs dir (or HEAD).
341 Args:
342 name: The name of the reference.
344 Raises:
345 KeyError: if a refname is not HEAD or is otherwise not valid.
346 """
347 if name in (HEADREF, b"refs/stash"):
348 return
349 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
350 raise RefFormatError(name)
352 def read_ref(self, refname: bytes) -> Optional[bytes]:
353 """Read a reference without following any references.
355 Args:
356 refname: The name of the reference
357 Returns: The contents of the ref file, or None if it does
358 not exist.
359 """
360 contents = self.read_loose_ref(refname)
361 if not contents:
362 contents = self.get_packed_refs().get(refname, None)
363 return contents
365 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
366 """Read a loose reference and return its contents.
368 Args:
369 name: the refname to read
370 Returns: The contents of the ref file, or None if it does
371 not exist.
372 """
373 raise NotImplementedError(self.read_loose_ref)
375 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
376 """Follow a reference name.
378 Returns: a tuple of (refnames, sha), wheres refnames are the names of
379 references in the chain
380 """
381 contents: Optional[bytes] = SYMREF + name
382 depth = 0
383 refnames = []
384 while contents and contents.startswith(SYMREF):
385 refname = contents[len(SYMREF) :]
386 refnames.append(refname)
387 contents = self.read_ref(refname)
388 if not contents:
389 break
390 depth += 1
391 if depth > 5:
392 raise SymrefLoop(name, depth)
393 return refnames, contents
395 def __contains__(self, refname: bytes) -> bool:
396 """Check if a reference exists."""
397 if self.read_ref(refname):
398 return True
399 return False
401 def __getitem__(self, name: bytes) -> ObjectID:
402 """Get the SHA1 for a reference name.
404 This method follows all symbolic references.
405 """
406 _, sha = self.follow(name)
407 if sha is None:
408 raise KeyError(name)
409 return sha
411 def set_if_equals(
412 self,
413 name: bytes,
414 old_ref: Optional[bytes],
415 new_ref: bytes,
416 committer: Optional[bytes] = None,
417 timestamp: Optional[int] = None,
418 timezone: Optional[int] = None,
419 message: Optional[bytes] = None,
420 ) -> bool:
421 """Set a refname to new_ref only if it currently equals old_ref.
423 This method follows all symbolic references if applicable for the
424 subclass, and can be used to perform an atomic compare-and-swap
425 operation.
427 Args:
428 name: The refname to set.
429 old_ref: The old sha the refname must refer to, or None to set
430 unconditionally.
431 new_ref: The new sha the refname will refer to.
432 committer: Optional committer name/email
433 timestamp: Optional timestamp
434 timezone: Optional timezone
435 message: Message for reflog
436 Returns: True if the set was successful, False otherwise.
437 """
438 raise NotImplementedError(self.set_if_equals)
440 def add_if_new(
441 self,
442 name: bytes,
443 ref: bytes,
444 committer: Optional[bytes] = None,
445 timestamp: Optional[int] = None,
446 timezone: Optional[int] = None,
447 message: Optional[bytes] = None,
448 ) -> bool:
449 """Add a new reference only if it does not already exist.
451 Args:
452 name: Ref name
453 ref: Ref value
454 committer: Optional committer name/email
455 timestamp: Optional timestamp
456 timezone: Optional timezone
457 message: Optional message for reflog
458 """
459 raise NotImplementedError(self.add_if_new)
461 def __setitem__(self, name: bytes, ref: bytes) -> None:
462 """Set a reference name to point to the given SHA1.
464 This method follows all symbolic references if applicable for the
465 subclass.
467 Note: This method unconditionally overwrites the contents of a
468 reference. To update atomically only if the reference has not
469 changed, use set_if_equals().
471 Args:
472 name: The refname to set.
473 ref: The new sha the refname will refer to.
474 """
475 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
476 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
477 self.set_if_equals(name, None, ref)
479 def remove_if_equals(
480 self,
481 name: bytes,
482 old_ref: Optional[bytes],
483 committer: Optional[bytes] = None,
484 timestamp: Optional[int] = None,
485 timezone: Optional[int] = None,
486 message: Optional[bytes] = None,
487 ) -> bool:
488 """Remove a refname only if it currently equals old_ref.
490 This method does not follow symbolic references, even if applicable for
491 the subclass. It can be used to perform an atomic compare-and-delete
492 operation.
494 Args:
495 name: The refname to delete.
496 old_ref: The old sha the refname must refer to, or None to
497 delete unconditionally.
498 committer: Optional committer name/email
499 timestamp: Optional timestamp
500 timezone: Optional timezone
501 message: Message for reflog
502 Returns: True if the delete was successful, False otherwise.
503 """
504 raise NotImplementedError(self.remove_if_equals)
506 def __delitem__(self, name: bytes) -> None:
507 """Remove a refname.
509 This method does not follow symbolic references, even if applicable for
510 the subclass.
512 Note: This method unconditionally deletes the contents of a reference.
513 To delete atomically only if the reference has not changed, use
514 remove_if_equals().
516 Args:
517 name: The refname to delete.
518 """
519 self.remove_if_equals(name, None)
521 def get_symrefs(self) -> dict[bytes, bytes]:
522 """Get a dict with all symrefs in this container.
524 Returns: Dictionary mapping source ref to target ref
525 """
526 ret = {}
527 for src in self.allkeys():
528 try:
529 ref_value = self.read_ref(src)
530 assert ref_value is not None
531 dst = parse_symref_value(ref_value)
532 except ValueError:
533 pass
534 else:
535 ret[src] = dst
536 return ret
538 def pack_refs(self, all: bool = False) -> None:
539 """Pack loose refs into packed-refs file.
541 Args:
542 all: If True, pack all refs. If False, only pack tags.
543 """
544 raise NotImplementedError(self.pack_refs)
547class DictRefsContainer(RefsContainer):
548 """RefsContainer backed by a simple dict.
550 This container does not support symbolic or packed references and is not
551 threadsafe.
552 """
554 def __init__(
555 self,
556 refs: dict[bytes, bytes],
557 logger: Optional[
558 Callable[
559 [
560 bytes,
561 Optional[bytes],
562 Optional[bytes],
563 Optional[bytes],
564 Optional[int],
565 Optional[int],
566 Optional[bytes],
567 ],
568 None,
569 ]
570 ] = None,
571 ) -> None:
572 """Initialize DictRefsContainer with refs dictionary and optional logger."""
573 super().__init__(logger=logger)
574 self._refs = refs
575 self._peeled: dict[bytes, ObjectID] = {}
576 self._watchers: set[Any] = set()
578 def allkeys(self) -> set[bytes]:
579 """Return all reference keys."""
580 return set(self._refs.keys())
582 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
583 """Read a loose reference."""
584 return self._refs.get(name, None)
586 def get_packed_refs(self) -> dict[bytes, bytes]:
587 """Get packed references."""
588 return {}
590 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
591 for watcher in self._watchers:
592 watcher._notify((ref, newsha))
594 def set_symbolic_ref(
595 self,
596 name: Ref,
597 other: Ref,
598 committer: Optional[bytes] = None,
599 timestamp: Optional[int] = None,
600 timezone: Optional[int] = None,
601 message: Optional[bytes] = None,
602 ) -> None:
603 """Make a ref point at another ref.
605 Args:
606 name: Name of the ref to set
607 other: Name of the ref to point at
608 committer: Optional committer name for reflog
609 timestamp: Optional timestamp for reflog
610 timezone: Optional timezone for reflog
611 message: Optional message for reflog
612 """
613 old = self.follow(name)[-1]
614 new = SYMREF + other
615 self._refs[name] = new
616 self._notify(name, new)
617 self._log(
618 name,
619 old,
620 new,
621 committer=committer,
622 timestamp=timestamp,
623 timezone=timezone,
624 message=message,
625 )
627 def set_if_equals(
628 self,
629 name: bytes,
630 old_ref: Optional[bytes],
631 new_ref: bytes,
632 committer: Optional[bytes] = None,
633 timestamp: Optional[int] = None,
634 timezone: Optional[int] = None,
635 message: Optional[bytes] = None,
636 ) -> bool:
637 """Set a refname to new_ref only if it currently equals old_ref.
639 This method follows all symbolic references, and can be used to perform
640 an atomic compare-and-swap operation.
642 Args:
643 name: The refname to set.
644 old_ref: The old sha the refname must refer to, or None to set
645 unconditionally.
646 new_ref: The new sha the refname will refer to.
647 committer: Optional committer name for reflog
648 timestamp: Optional timestamp for reflog
649 timezone: Optional timezone for reflog
650 message: Optional message for reflog
652 Returns:
653 True if the set was successful, False otherwise.
654 """
655 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
656 return False
657 # Only update the specific ref requested, not the whole chain
658 self._check_refname(name)
659 old = self._refs.get(name)
660 self._refs[name] = new_ref
661 self._notify(name, new_ref)
662 self._log(
663 name,
664 old,
665 new_ref,
666 committer=committer,
667 timestamp=timestamp,
668 timezone=timezone,
669 message=message,
670 )
671 return True
673 def add_if_new(
674 self,
675 name: Ref,
676 ref: ObjectID,
677 committer: Optional[bytes] = None,
678 timestamp: Optional[int] = None,
679 timezone: Optional[int] = None,
680 message: Optional[bytes] = None,
681 ) -> bool:
682 """Add a new reference only if it does not already exist.
684 Args:
685 name: Ref name
686 ref: Ref value
687 committer: Optional committer name for reflog
688 timestamp: Optional timestamp for reflog
689 timezone: Optional timezone for reflog
690 message: Optional message for reflog
692 Returns:
693 True if the add was successful, False otherwise.
694 """
695 if name in self._refs:
696 return False
697 self._refs[name] = ref
698 self._notify(name, ref)
699 self._log(
700 name,
701 None,
702 ref,
703 committer=committer,
704 timestamp=timestamp,
705 timezone=timezone,
706 message=message,
707 )
708 return True
710 def remove_if_equals(
711 self,
712 name: bytes,
713 old_ref: Optional[bytes],
714 committer: Optional[bytes] = None,
715 timestamp: Optional[int] = None,
716 timezone: Optional[int] = None,
717 message: Optional[bytes] = None,
718 ) -> bool:
719 """Remove a refname only if it currently equals old_ref.
721 This method does not follow symbolic references. It can be used to
722 perform an atomic compare-and-delete operation.
724 Args:
725 name: The refname to delete.
726 old_ref: The old sha the refname must refer to, or None to
727 delete unconditionally.
728 committer: Optional committer name for reflog
729 timestamp: Optional timestamp for reflog
730 timezone: Optional timezone for reflog
731 message: Optional message for reflog
733 Returns:
734 True if the delete was successful, False otherwise.
735 """
736 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
737 return False
738 try:
739 old = self._refs.pop(name)
740 except KeyError:
741 pass
742 else:
743 self._notify(name, None)
744 self._log(
745 name,
746 old,
747 None,
748 committer=committer,
749 timestamp=timestamp,
750 timezone=timezone,
751 message=message,
752 )
753 return True
755 def get_peeled(self, name: bytes) -> Optional[bytes]:
756 """Get peeled version of a reference."""
757 return self._peeled.get(name)
759 def _update(self, refs: Mapping[bytes, bytes]) -> None:
760 """Update multiple refs; intended only for testing."""
761 # TODO(dborowitz): replace this with a public function that uses
762 # set_if_equal.
763 for ref, sha in refs.items():
764 self.set_if_equals(ref, None, sha)
766 def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None:
767 """Update cached peeled refs; intended only for testing."""
768 self._peeled.update(peeled)
771class InfoRefsContainer(RefsContainer):
772 """Refs container that reads refs from a info/refs file."""
774 def __init__(self, f: BinaryIO) -> None:
775 """Initialize InfoRefsContainer from info/refs file."""
776 self._refs: dict[bytes, bytes] = {}
777 self._peeled: dict[bytes, bytes] = {}
778 refs = read_info_refs(f)
779 (self._refs, self._peeled) = split_peeled_refs(refs)
781 def allkeys(self) -> set[bytes]:
782 """Return all reference keys."""
783 return set(self._refs.keys())
785 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
786 """Read a loose reference."""
787 return self._refs.get(name, None)
789 def get_packed_refs(self) -> dict[bytes, bytes]:
790 """Get packed references."""
791 return {}
793 def get_peeled(self, name: bytes) -> Optional[bytes]:
794 """Get peeled version of a reference."""
795 try:
796 return self._peeled[name]
797 except KeyError:
798 return self._refs[name]
801class DiskRefsContainer(RefsContainer):
802 """Refs container that reads refs from disk."""
804 def __init__(
805 self,
806 path: Union[str, bytes, os.PathLike[str]],
807 worktree_path: Optional[Union[str, bytes, os.PathLike[str]]] = None,
808 logger: Optional[
809 Callable[
810 [
811 bytes,
812 bytes,
813 bytes,
814 Optional[bytes],
815 Optional[int],
816 Optional[int],
817 bytes,
818 ],
819 None,
820 ]
821 ] = None,
822 ) -> None:
823 """Initialize DiskRefsContainer."""
824 super().__init__(logger=logger)
825 # Convert path-like objects to strings, then to bytes for Git compatibility
826 self.path = os.fsencode(os.fspath(path))
827 if worktree_path is None:
828 self.worktree_path = self.path
829 else:
830 self.worktree_path = os.fsencode(os.fspath(worktree_path))
831 self._packed_refs: Optional[dict[bytes, bytes]] = None
832 self._peeled_refs: Optional[dict[bytes, bytes]] = None
834 def __repr__(self) -> str:
835 """Return string representation of DiskRefsContainer."""
836 return f"{self.__class__.__name__}({self.path!r})"
838 def _iter_dir(
839 self,
840 path: bytes,
841 base: bytes,
842 dir_filter: Optional[Callable[[bytes], bool]] = None,
843 ) -> Iterator[bytes]:
844 refspath = os.path.join(path, base.rstrip(b"/"))
845 prefix_len = len(os.path.join(path, b""))
847 for root, dirs, files in os.walk(refspath):
848 directory = root[prefix_len:]
849 if os.path.sep != "/":
850 directory = directory.replace(os.fsencode(os.path.sep), b"/")
851 if dir_filter is not None:
852 dirs[:] = [
853 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
854 ]
856 for filename in files:
857 refname = b"/".join([directory, filename])
858 if check_ref_format(refname):
859 yield refname
861 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
862 base = base.rstrip(b"/") + b"/"
863 search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = []
864 if base != b"refs/":
865 path = self.worktree_path if is_per_worktree_ref(base) else self.path
866 search_paths.append((path, None))
867 elif self.worktree_path == self.path:
868 # Iterate through all the refs from the main worktree
869 search_paths.append((self.path, None))
870 else:
871 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
872 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
873 # Iterate through all the per-worktree refs from the worktree's gitdir
874 search_paths.append((self.worktree_path, is_per_worktree_ref))
876 for path, dir_filter in search_paths:
877 yield from self._iter_dir(path, base, dir_filter=dir_filter)
879 def subkeys(self, base: bytes) -> set[bytes]:
880 """Return subkeys under a given base reference path."""
881 subkeys = set()
883 for key in self._iter_loose_refs(base):
884 if key.startswith(base):
885 subkeys.add(key[len(base) :].strip(b"/"))
887 for key in self.get_packed_refs():
888 if key.startswith(base):
889 subkeys.add(key[len(base) :].strip(b"/"))
890 return subkeys
892 def allkeys(self) -> set[bytes]:
893 """Return all reference keys."""
894 allkeys = set()
895 if os.path.exists(self.refpath(HEADREF)):
896 allkeys.add(HEADREF)
898 allkeys.update(self._iter_loose_refs())
899 allkeys.update(self.get_packed_refs())
900 return allkeys
902 def refpath(self, name: bytes) -> bytes:
903 """Return the disk path of a ref."""
904 path = name
905 if os.path.sep != "/":
906 path = path.replace(b"/", os.fsencode(os.path.sep))
908 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
909 return os.path.join(root_dir, path)
911 def get_packed_refs(self) -> dict[bytes, bytes]:
912 """Get contents of the packed-refs file.
914 Returns: Dictionary mapping ref names to SHA1s
916 Note: Will return an empty dictionary when no packed-refs file is
917 present.
918 """
919 # TODO: invalidate the cache on repacking
920 if self._packed_refs is None:
921 # set both to empty because we want _peeled_refs to be
922 # None if and only if _packed_refs is also None.
923 self._packed_refs = {}
924 self._peeled_refs = {}
925 path = os.path.join(self.path, b"packed-refs")
926 try:
927 f = GitFile(path, "rb")
928 except FileNotFoundError:
929 return {}
930 with f:
931 first_line = next(iter(f)).rstrip()
932 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
933 for sha, name, peeled in read_packed_refs_with_peeled(f):
934 self._packed_refs[name] = sha
935 if peeled:
936 self._peeled_refs[name] = peeled
937 else:
938 f.seek(0)
939 for sha, name in read_packed_refs(f):
940 self._packed_refs[name] = sha
941 return self._packed_refs
943 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
944 """Add the given refs as packed refs.
946 Args:
947 new_refs: A mapping of ref names to targets; if a target is None that
948 means remove the ref
949 """
950 if not new_refs:
951 return
953 path = os.path.join(self.path, b"packed-refs")
955 with GitFile(path, "wb") as f:
956 # reread cached refs from disk, while holding the lock
957 packed_refs = self.get_packed_refs().copy()
959 for ref, target in new_refs.items():
960 # sanity check
961 if ref == HEADREF:
962 raise ValueError("cannot pack HEAD")
964 # remove any loose refs pointing to this one -- please
965 # note that this bypasses remove_if_equals as we don't
966 # want to affect packed refs in here
967 with suppress(OSError):
968 os.remove(self.refpath(ref))
970 if target is not None:
971 packed_refs[ref] = target
972 else:
973 packed_refs.pop(ref, None)
975 write_packed_refs(f, packed_refs, self._peeled_refs)
977 self._packed_refs = packed_refs
979 def get_peeled(self, name: bytes) -> Optional[bytes]:
980 """Return the cached peeled value of a ref, if available.
982 Args:
983 name: Name of the ref to peel
984 Returns: The peeled value of the ref. If the ref is known not point to
985 a tag, this will be the SHA the ref refers to. If the ref may point
986 to a tag, but no cached information is available, None is returned.
987 """
988 self.get_packed_refs()
989 if (
990 self._peeled_refs is None
991 or self._packed_refs is None
992 or name not in self._packed_refs
993 ):
994 # No cache: no peeled refs were read, or this ref is loose
995 return None
996 if name in self._peeled_refs:
997 return self._peeled_refs[name]
998 else:
999 # Known not peelable
1000 return self[name]
1002 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
1003 """Read a reference file and return its contents.
1005 If the reference file a symbolic reference, only read the first line of
1006 the file. Otherwise, only read the first 40 bytes.
1008 Args:
1009 name: the refname to read, relative to refpath
1010 Returns: The contents of the ref file, or None if the file does not
1011 exist.
1013 Raises:
1014 IOError: if any other error occurs
1015 """
1016 filename = self.refpath(name)
1017 try:
1018 with GitFile(filename, "rb") as f:
1019 header = f.read(len(SYMREF))
1020 if header == SYMREF:
1021 # Read only the first line
1022 return header + next(iter(f)).rstrip(b"\r\n")
1023 else:
1024 # Read only the first 40 bytes
1025 return header + f.read(40 - len(SYMREF))
1026 except (OSError, UnicodeError):
1027 # don't assume anything specific about the error; in
1028 # particular, invalid or forbidden paths can raise weird
1029 # errors depending on the specific operating system
1030 return None
1032 def _remove_packed_ref(self, name: bytes) -> None:
1033 if self._packed_refs is None:
1034 return
1035 filename = os.path.join(self.path, b"packed-refs")
1036 # reread cached refs from disk, while holding the lock
1037 f = GitFile(filename, "wb")
1038 try:
1039 self._packed_refs = None
1040 self.get_packed_refs()
1042 if self._packed_refs is None or name not in self._packed_refs:
1043 f.abort()
1044 return
1046 del self._packed_refs[name]
1047 if self._peeled_refs is not None:
1048 with suppress(KeyError):
1049 del self._peeled_refs[name]
1050 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1051 f.close()
1052 except BaseException:
1053 f.abort()
1054 raise
1056 def set_symbolic_ref(
1057 self,
1058 name: bytes,
1059 other: bytes,
1060 committer: Optional[bytes] = None,
1061 timestamp: Optional[int] = None,
1062 timezone: Optional[int] = None,
1063 message: Optional[bytes] = None,
1064 ) -> None:
1065 """Make a ref point at another ref.
1067 Args:
1068 name: Name of the ref to set
1069 other: Name of the ref to point at
1070 committer: Optional committer name
1071 timestamp: Optional timestamp
1072 timezone: Optional timezone
1073 message: Optional message to describe the change
1074 """
1075 self._check_refname(name)
1076 self._check_refname(other)
1077 filename = self.refpath(name)
1078 f = GitFile(filename, "wb")
1079 try:
1080 f.write(SYMREF + other + b"\n")
1081 sha = self.follow(name)[-1]
1082 self._log(
1083 name,
1084 sha,
1085 sha,
1086 committer=committer,
1087 timestamp=timestamp,
1088 timezone=timezone,
1089 message=message,
1090 )
1091 except BaseException:
1092 f.abort()
1093 raise
1094 else:
1095 f.close()
1097 def set_if_equals(
1098 self,
1099 name: bytes,
1100 old_ref: Optional[bytes],
1101 new_ref: bytes,
1102 committer: Optional[bytes] = None,
1103 timestamp: Optional[int] = None,
1104 timezone: Optional[int] = None,
1105 message: Optional[bytes] = None,
1106 ) -> bool:
1107 """Set a refname to new_ref only if it currently equals old_ref.
1109 This method follows all symbolic references, and can be used to perform
1110 an atomic compare-and-swap operation.
1112 Args:
1113 name: The refname to set.
1114 old_ref: The old sha the refname must refer to, or None to set
1115 unconditionally.
1116 new_ref: The new sha the refname will refer to.
1117 committer: Optional committer name
1118 timestamp: Optional timestamp
1119 timezone: Optional timezone
1120 message: Set message for reflog
1121 Returns: True if the set was successful, False otherwise.
1122 """
1123 self._check_refname(name)
1124 try:
1125 realnames, _ = self.follow(name)
1126 realname = realnames[-1]
1127 except (KeyError, IndexError, SymrefLoop):
1128 realname = name
1129 filename = self.refpath(realname)
1131 # make sure none of the ancestor folders is in packed refs
1132 probe_ref = os.path.dirname(realname)
1133 packed_refs = self.get_packed_refs()
1134 while probe_ref:
1135 if packed_refs.get(probe_ref, None) is not None:
1136 raise NotADirectoryError(filename)
1137 probe_ref = os.path.dirname(probe_ref)
1139 ensure_dir_exists(os.path.dirname(filename))
1140 with GitFile(filename, "wb") as f:
1141 if old_ref is not None:
1142 try:
1143 # read again while holding the lock to handle race conditions
1144 orig_ref = self.read_loose_ref(realname)
1145 if orig_ref is None:
1146 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1147 if orig_ref != old_ref:
1148 f.abort()
1149 return False
1150 except OSError:
1151 f.abort()
1152 raise
1154 # Check if ref already has the desired value while holding the lock
1155 # This avoids fsync when ref is unchanged but still detects lock conflicts
1156 current_ref = self.read_loose_ref(realname)
1157 if current_ref is None:
1158 current_ref = packed_refs.get(realname, None)
1160 if current_ref is not None and current_ref == new_ref:
1161 # Ref already has desired value, abort write to avoid fsync
1162 f.abort()
1163 return True
1165 try:
1166 f.write(new_ref + b"\n")
1167 except OSError:
1168 f.abort()
1169 raise
1170 self._log(
1171 realname,
1172 old_ref,
1173 new_ref,
1174 committer=committer,
1175 timestamp=timestamp,
1176 timezone=timezone,
1177 message=message,
1178 )
1179 return True
1181 def add_if_new(
1182 self,
1183 name: bytes,
1184 ref: bytes,
1185 committer: Optional[bytes] = None,
1186 timestamp: Optional[int] = None,
1187 timezone: Optional[int] = None,
1188 message: Optional[bytes] = None,
1189 ) -> bool:
1190 """Add a new reference only if it does not already exist.
1192 This method follows symrefs, and only ensures that the last ref in the
1193 chain does not exist.
1195 Args:
1196 name: The refname to set.
1197 ref: The new sha the refname will refer to.
1198 committer: Optional committer name
1199 timestamp: Optional timestamp
1200 timezone: Optional timezone
1201 message: Optional message for reflog
1202 Returns: True if the add was successful, False otherwise.
1203 """
1204 try:
1205 realnames, contents = self.follow(name)
1206 if contents is not None:
1207 return False
1208 realname = realnames[-1]
1209 except (KeyError, IndexError):
1210 realname = name
1211 self._check_refname(realname)
1212 filename = self.refpath(realname)
1213 ensure_dir_exists(os.path.dirname(filename))
1214 with GitFile(filename, "wb") as f:
1215 if os.path.exists(filename) or name in self.get_packed_refs():
1216 f.abort()
1217 return False
1218 try:
1219 f.write(ref + b"\n")
1220 except OSError:
1221 f.abort()
1222 raise
1223 else:
1224 self._log(
1225 name,
1226 None,
1227 ref,
1228 committer=committer,
1229 timestamp=timestamp,
1230 timezone=timezone,
1231 message=message,
1232 )
1233 return True
1235 def remove_if_equals(
1236 self,
1237 name: bytes,
1238 old_ref: Optional[bytes],
1239 committer: Optional[bytes] = None,
1240 timestamp: Optional[int] = None,
1241 timezone: Optional[int] = None,
1242 message: Optional[bytes] = None,
1243 ) -> bool:
1244 """Remove a refname only if it currently equals old_ref.
1246 This method does not follow symbolic references. It can be used to
1247 perform an atomic compare-and-delete operation.
1249 Args:
1250 name: The refname to delete.
1251 old_ref: The old sha the refname must refer to, or None to
1252 delete unconditionally.
1253 committer: Optional committer name
1254 timestamp: Optional timestamp
1255 timezone: Optional timezone
1256 message: Optional message
1257 Returns: True if the delete was successful, False otherwise.
1258 """
1259 self._check_refname(name)
1260 filename = self.refpath(name)
1261 ensure_dir_exists(os.path.dirname(filename))
1262 f = GitFile(filename, "wb")
1263 try:
1264 if old_ref is not None:
1265 orig_ref = self.read_loose_ref(name)
1266 if orig_ref is None:
1267 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1268 if orig_ref != old_ref:
1269 return False
1271 # remove the reference file itself
1272 try:
1273 found = os.path.lexists(filename)
1274 except OSError:
1275 # may only be packed, or otherwise unstorable
1276 found = False
1278 if found:
1279 os.remove(filename)
1281 self._remove_packed_ref(name)
1282 self._log(
1283 name,
1284 old_ref,
1285 None,
1286 committer=committer,
1287 timestamp=timestamp,
1288 timezone=timezone,
1289 message=message,
1290 )
1291 finally:
1292 # never write, we just wanted the lock
1293 f.abort()
1295 # outside of the lock, clean-up any parent directory that might now
1296 # be empty. this ensures that re-creating a reference of the same
1297 # name of what was previously a directory works as expected
1298 parent = name
1299 while True:
1300 try:
1301 parent, _ = parent.rsplit(b"/", 1)
1302 except ValueError:
1303 break
1305 if parent == b"refs":
1306 break
1307 parent_filename = self.refpath(parent)
1308 try:
1309 os.rmdir(parent_filename)
1310 except OSError:
1311 # this can be caused by the parent directory being
1312 # removed by another process, being not empty, etc.
1313 # in any case, this is non fatal because we already
1314 # removed the reference, just ignore it
1315 break
1317 return True
1319 def pack_refs(self, all: bool = False) -> None:
1320 """Pack loose refs into packed-refs file.
1322 Args:
1323 all: If True, pack all refs. If False, only pack tags.
1324 """
1325 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1326 for ref in self.allkeys():
1327 if ref == HEADREF:
1328 # Never pack HEAD
1329 continue
1330 if all or ref.startswith(LOCAL_TAG_PREFIX):
1331 try:
1332 sha = self[ref]
1333 if sha:
1334 refs_to_pack[ref] = sha
1335 except KeyError:
1336 # Broken ref, skip it
1337 pass
1339 if refs_to_pack:
1340 self.add_packed_refs(refs_to_pack)
1343def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
1344 """Split a single ref line into a tuple of SHA1 and name."""
1345 fields = line.rstrip(b"\n\r").split(b" ")
1346 if len(fields) != 2:
1347 raise PackedRefsException(f"invalid ref line {line!r}")
1348 sha, name = fields
1349 if not valid_hexsha(sha):
1350 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1351 if not check_ref_format(name):
1352 raise PackedRefsException(f"invalid ref name {name!r}")
1353 return (sha, name)
1356def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
1357 """Read a packed refs file.
1359 Args:
1360 f: file-like object to read from
1361 Returns: Iterator over tuples with SHA1s and ref names.
1362 """
1363 for line in f:
1364 if line.startswith(b"#"):
1365 # Comment
1366 continue
1367 if line.startswith(b"^"):
1368 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1369 yield _split_ref_line(line)
1372def read_packed_refs_with_peeled(
1373 f: IO[bytes],
1374) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
1375 """Read a packed refs file including peeled refs.
1377 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1378 with ref names, SHA1s, and peeled SHA1s (or None).
1380 Args:
1381 f: file-like object to read from, seek'ed to the second line
1382 """
1383 last = None
1384 for line in f:
1385 if line.startswith(b"#"):
1386 continue
1387 line = line.rstrip(b"\r\n")
1388 if line.startswith(b"^"):
1389 if not last:
1390 raise PackedRefsException("unexpected peeled ref line")
1391 if not valid_hexsha(line[1:]):
1392 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1393 sha, name = _split_ref_line(last)
1394 last = None
1395 yield (sha, name, line[1:])
1396 else:
1397 if last:
1398 sha, name = _split_ref_line(last)
1399 yield (sha, name, None)
1400 last = line
1401 if last:
1402 sha, name = _split_ref_line(last)
1403 yield (sha, name, None)
1406def write_packed_refs(
1407 f: IO[bytes],
1408 packed_refs: Mapping[bytes, bytes],
1409 peeled_refs: Optional[Mapping[bytes, bytes]] = None,
1410) -> None:
1411 """Write a packed refs file.
1413 Args:
1414 f: empty file-like object to write to
1415 packed_refs: dict of refname to sha of packed refs to write
1416 peeled_refs: dict of refname to peeled value of sha
1417 """
1418 if peeled_refs is None:
1419 peeled_refs = {}
1420 else:
1421 f.write(b"# pack-refs with: peeled\n")
1422 for refname in sorted(packed_refs.keys()):
1423 f.write(git_line(packed_refs[refname], refname))
1424 if refname in peeled_refs:
1425 f.write(b"^" + peeled_refs[refname] + b"\n")
1428def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
1429 """Read info/refs file.
1431 Args:
1432 f: File-like object to read from
1434 Returns:
1435 Dictionary mapping ref names to SHA1s
1436 """
1437 ret = {}
1438 for line in f.readlines():
1439 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1440 ret[name] = sha
1441 return ret
1444def write_info_refs(
1445 refs: Mapping[bytes, bytes], store: ObjectContainer
1446) -> Iterator[bytes]:
1447 """Generate info refs."""
1448 # TODO: Avoid recursive import :(
1449 from .object_store import peel_sha
1451 for name, sha in sorted(refs.items()):
1452 # get_refs() includes HEAD as a special case, but we don't want to
1453 # advertise it
1454 if name == HEADREF:
1455 continue
1456 try:
1457 o = store[sha]
1458 except KeyError:
1459 continue
1460 _unpeeled, peeled = peel_sha(store, sha)
1461 yield o.id + b"\t" + name + b"\n"
1462 if o.id != peeled.id:
1463 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1466def is_local_branch(x: bytes) -> bool:
1467 """Check if a ref name is a local branch."""
1468 return x.startswith(LOCAL_BRANCH_PREFIX)
1471T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
1474def strip_peeled_refs(refs: T) -> T:
1475 """Remove all peeled refs."""
1476 return {
1477 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1478 }
1481def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
1482 """Split peeled refs from regular refs."""
1483 peeled: dict[bytes, bytes] = {}
1484 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
1486 for ref, sha in refs.items():
1487 if ref.endswith(PEELED_TAG_SUFFIX):
1488 # Only add to peeled dict if sha is not None
1489 if sha is not None:
1490 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1492 return regular, peeled
1495def _set_origin_head(
1496 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
1497) -> None:
1498 # set refs/remotes/origin/HEAD
1499 origin_base = b"refs/remotes/" + origin + b"/"
1500 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1501 origin_ref = origin_base + HEADREF
1502 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1503 if target_ref in refs:
1504 refs.set_symbolic_ref(origin_ref, target_ref)
1507def _set_default_branch(
1508 refs: RefsContainer,
1509 origin: bytes,
1510 origin_head: Optional[bytes],
1511 branch: Optional[bytes],
1512 ref_message: Optional[bytes],
1513) -> bytes:
1514 """Set the default branch."""
1515 origin_base = b"refs/remotes/" + origin + b"/"
1516 if branch:
1517 origin_ref = origin_base + branch
1518 if origin_ref in refs:
1519 local_ref = LOCAL_BRANCH_PREFIX + branch
1520 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1521 head_ref = local_ref
1522 elif LOCAL_TAG_PREFIX + branch in refs:
1523 head_ref = LOCAL_TAG_PREFIX + branch
1524 else:
1525 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1526 elif origin_head:
1527 head_ref = origin_head
1528 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1529 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1530 else:
1531 origin_ref = origin_head
1532 try:
1533 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1534 except KeyError:
1535 pass
1536 else:
1537 raise ValueError("neither origin_head nor branch are provided")
1538 return head_ref
1541def _set_head(
1542 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
1543) -> Optional[bytes]:
1544 if head_ref.startswith(LOCAL_TAG_PREFIX):
1545 # detach HEAD at specified tag
1546 head = refs[head_ref]
1547 if isinstance(head, Tag):
1548 _cls, obj = head.object
1549 head = obj.get_object(obj).id
1550 del refs[HEADREF]
1551 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1552 else:
1553 # set HEAD to specific branch
1554 try:
1555 head = refs[head_ref]
1556 refs.set_symbolic_ref(HEADREF, head_ref)
1557 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1558 except KeyError:
1559 head = None
1560 return head
1563def _import_remote_refs(
1564 refs_container: RefsContainer,
1565 remote_name: str,
1566 refs: dict[bytes, Optional[bytes]],
1567 message: Optional[bytes] = None,
1568 prune: bool = False,
1569 prune_tags: bool = False,
1570) -> None:
1571 stripped_refs = strip_peeled_refs(refs)
1572 branches = {
1573 n[len(LOCAL_BRANCH_PREFIX) :]: v
1574 for (n, v) in stripped_refs.items()
1575 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
1576 }
1577 refs_container.import_refs(
1578 b"refs/remotes/" + remote_name.encode(),
1579 branches,
1580 message=message,
1581 prune=prune,
1582 )
1583 tags = {
1584 n[len(LOCAL_TAG_PREFIX) :]: v
1585 for (n, v) in stripped_refs.items()
1586 if n.startswith(LOCAL_TAG_PREFIX)
1587 and not n.endswith(PEELED_TAG_SUFFIX)
1588 and v is not None
1589 }
1590 refs_container.import_refs(
1591 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1592 )
1595def serialize_refs(
1596 store: ObjectContainer, refs: Mapping[bytes, bytes]
1597) -> dict[bytes, bytes]:
1598 """Serialize refs with peeled refs.
1600 Args:
1601 store: Object store to peel refs from
1602 refs: Dictionary of ref names to SHAs
1604 Returns:
1605 Dictionary with refs and peeled refs (marked with ^{})
1606 """
1607 # TODO: Avoid recursive import :(
1608 from .object_store import peel_sha
1610 ret = {}
1611 for ref, sha in refs.items():
1612 try:
1613 unpeeled, peeled = peel_sha(store, sha)
1614 except KeyError:
1615 warnings.warn(
1616 "ref {} points at non-present sha {}".format(
1617 ref.decode("utf-8", "replace"), sha.decode("ascii")
1618 ),
1619 UserWarning,
1620 )
1621 continue
1622 else:
1623 if isinstance(unpeeled, Tag):
1624 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1625 ret[ref] = unpeeled.id
1626 return ret
1629class locked_ref:
1630 """Lock a ref while making modifications.
1632 Works as a context manager.
1633 """
1635 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1636 """Initialize a locked ref.
1638 Args:
1639 refs_container: The DiskRefsContainer to lock the ref in
1640 refname: The ref name to lock
1641 """
1642 self._refs_container = refs_container
1643 self._refname = refname
1644 self._file: Optional[_GitFile] = None
1645 self._realname: Optional[Ref] = None
1646 self._deleted = False
1648 def __enter__(self) -> "locked_ref":
1649 """Enter the context manager and acquire the lock.
1651 Returns:
1652 This locked_ref instance
1654 Raises:
1655 OSError: If the lock cannot be acquired
1656 """
1657 self._refs_container._check_refname(self._refname)
1658 try:
1659 realnames, _ = self._refs_container.follow(self._refname)
1660 self._realname = realnames[-1]
1661 except (KeyError, IndexError, SymrefLoop):
1662 self._realname = self._refname
1664 filename = self._refs_container.refpath(self._realname)
1665 ensure_dir_exists(os.path.dirname(filename))
1666 f = GitFile(filename, "wb")
1667 self._file = f
1668 return self
1670 def __exit__(
1671 self,
1672 exc_type: Optional[type],
1673 exc_value: Optional[BaseException],
1674 traceback: Optional[types.TracebackType],
1675 ) -> None:
1676 """Exit the context manager and release the lock.
1678 Args:
1679 exc_type: Type of exception if one occurred
1680 exc_value: Exception instance if one occurred
1681 traceback: Traceback if an exception occurred
1682 """
1683 if self._file:
1684 if exc_type is not None or self._deleted:
1685 self._file.abort()
1686 else:
1687 self._file.close()
1689 def get(self) -> Optional[bytes]:
1690 """Get the current value of the ref."""
1691 if not self._file:
1692 raise RuntimeError("locked_ref not in context")
1694 assert self._realname is not None
1695 current_ref = self._refs_container.read_loose_ref(self._realname)
1696 if current_ref is None:
1697 current_ref = self._refs_container.get_packed_refs().get(
1698 self._realname, None
1699 )
1700 return current_ref
1702 def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
1703 """Ensure the ref currently equals the expected value.
1705 Args:
1706 expected_value: The expected current value of the ref
1707 Returns:
1708 True if the ref equals the expected value, False otherwise
1709 """
1710 current_value = self.get()
1711 return current_value == expected_value
1713 def set(self, new_ref: bytes) -> None:
1714 """Set the ref to a new value.
1716 Args:
1717 new_ref: The new SHA1 or symbolic ref value
1718 """
1719 if not self._file:
1720 raise RuntimeError("locked_ref not in context")
1722 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1723 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1725 self._file.seek(0)
1726 self._file.truncate()
1727 self._file.write(new_ref + b"\n")
1728 self._deleted = False
1730 def set_symbolic_ref(self, target: Ref) -> None:
1731 """Make this ref point at another ref.
1733 Args:
1734 target: Name of the ref to point at
1735 """
1736 if not self._file:
1737 raise RuntimeError("locked_ref not in context")
1739 self._refs_container._check_refname(target)
1740 self._file.seek(0)
1741 self._file.truncate()
1742 self._file.write(SYMREF + target + b"\n")
1743 self._deleted = False
1745 def delete(self) -> None:
1746 """Delete the ref file while holding the lock."""
1747 if not self._file:
1748 raise RuntimeError("locked_ref not in context")
1750 # Delete the actual ref file while holding the lock
1751 if self._realname:
1752 filename = self._refs_container.refpath(self._realname)
1753 try:
1754 if os.path.lexists(filename):
1755 os.remove(filename)
1756 except FileNotFoundError:
1757 pass
1758 self._refs_container._remove_packed_ref(self._realname)
1760 self._deleted = True
1763def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
1764 """Filter refs to only include those with a given prefix.
1766 Args:
1767 refs: A dictionary of refs.
1768 prefixes: The prefixes to filter by.
1769 """
1770 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
1771 return filtered
1774def is_per_worktree_ref(ref: bytes) -> bool:
1775 """Returns whether a reference is stored per worktree or not.
1777 Per-worktree references are:
1778 - all pseudorefs, e.g. HEAD
1779 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
1781 All refs starting with "refs/" are shared, except for the ones listed above.
1783 See https://git-scm.com/docs/git-worktree#_refs.
1784 """
1785 return not ref.startswith(b"refs/") or ref.startswith(
1786 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
1787 )