Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import types
27import warnings
28from collections.abc import Iterable, Iterator
29from contextlib import suppress
30from typing import (
31 IO,
32 TYPE_CHECKING,
33 Any,
34 BinaryIO,
35 Callable,
36 Optional,
37 TypeVar,
38 Union,
39 cast,
40)
42if TYPE_CHECKING:
43 from .file import _GitFile
45from .errors import PackedRefsException, RefFormatError
46from .file import GitFile, ensure_dir_exists
47from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
48from .pack import ObjectContainer
50Ref = bytes
52HEADREF = b"HEAD"
53SYMREF = b"ref: "
54LOCAL_BRANCH_PREFIX = b"refs/heads/"
55LOCAL_TAG_PREFIX = b"refs/tags/"
56LOCAL_REMOTE_PREFIX = b"refs/remotes/"
57LOCAL_NOTES_PREFIX = b"refs/notes/"
58BAD_REF_CHARS = set(b"\177 ~^:?*[")
59PEELED_TAG_SUFFIX = b"^{}"
61# For backwards compatibility
62ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
65class SymrefLoop(Exception):
66 """There is a loop between one or more symrefs."""
68 def __init__(self, ref: bytes, depth: int) -> None:
69 """Initialize SymrefLoop exception."""
70 self.ref = ref
71 self.depth = depth
74def parse_symref_value(contents: bytes) -> bytes:
75 """Parse a symref value.
77 Args:
78 contents: Contents to parse
79 Returns: Destination
80 """
81 if contents.startswith(SYMREF):
82 return contents[len(SYMREF) :].rstrip(b"\r\n")
83 raise ValueError(contents)
86def check_ref_format(refname: Ref) -> bool:
87 """Check if a refname is correctly formatted.
89 Implements all the same rules as git-check-ref-format[1].
91 [1]
92 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
94 Args:
95 refname: The refname to check
96 Returns: True if refname is valid, False otherwise
97 """
98 # These could be combined into one big expression, but are listed
99 # separately to parallel [1].
100 if b"/." in refname or refname.startswith(b"."):
101 return False
102 if b"/" not in refname:
103 return False
104 if b".." in refname:
105 return False
106 for i, c in enumerate(refname):
107 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
108 return False
109 if refname[-1] in b"/.":
110 return False
111 if refname.endswith(b".lock"):
112 return False
113 if b"@{" in refname:
114 return False
115 if b"\\" in refname:
116 return False
117 return True
120def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
121 """Parse a remote ref into remote name and branch name.
123 Args:
124 ref: Remote ref like b"refs/remotes/origin/main"
126 Returns:
127 Tuple of (remote_name, branch_name)
129 Raises:
130 ValueError: If ref is not a valid remote ref
131 """
132 if not ref.startswith(LOCAL_REMOTE_PREFIX):
133 raise ValueError(f"Not a remote ref: {ref!r}")
135 # Remove the prefix
136 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
138 # Split into remote name and branch name
139 parts = remainder.split(b"/", 1)
140 if len(parts) != 2:
141 raise ValueError(f"Invalid remote ref format: {ref!r}")
143 remote_name, branch_name = parts
144 return (remote_name, branch_name)
147class RefsContainer:
148 """A container for refs."""
150 def __init__(
151 self,
152 logger: Optional[
153 Callable[
154 [
155 bytes,
156 Optional[bytes],
157 Optional[bytes],
158 Optional[bytes],
159 Optional[int],
160 Optional[int],
161 Optional[bytes],
162 ],
163 None,
164 ]
165 ] = None,
166 ) -> None:
167 """Initialize RefsContainer with optional logger function."""
168 self._logger = logger
170 def _log(
171 self,
172 ref: bytes,
173 old_sha: Optional[bytes],
174 new_sha: Optional[bytes],
175 committer: Optional[bytes] = None,
176 timestamp: Optional[int] = None,
177 timezone: Optional[int] = None,
178 message: Optional[bytes] = None,
179 ) -> None:
180 if self._logger is None:
181 return
182 if message is None:
183 return
184 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
186 def set_symbolic_ref(
187 self,
188 name: bytes,
189 other: bytes,
190 committer: Optional[bytes] = None,
191 timestamp: Optional[int] = None,
192 timezone: Optional[int] = None,
193 message: Optional[bytes] = None,
194 ) -> None:
195 """Make a ref point at another ref.
197 Args:
198 name: Name of the ref to set
199 other: Name of the ref to point at
200 committer: Optional committer name/email
201 timestamp: Optional timestamp
202 timezone: Optional timezone
203 message: Optional message
204 """
205 raise NotImplementedError(self.set_symbolic_ref)
207 def get_packed_refs(self) -> dict[Ref, ObjectID]:
208 """Get contents of the packed-refs file.
210 Returns: Dictionary mapping ref names to SHA1s
212 Note: Will return an empty dictionary when no packed-refs file is
213 present.
214 """
215 raise NotImplementedError(self.get_packed_refs)
217 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
218 """Add the given refs as packed refs.
220 Args:
221 new_refs: A mapping of ref names to targets; if a target is None that
222 means remove the ref
223 """
224 raise NotImplementedError(self.add_packed_refs)
226 def get_peeled(self, name: bytes) -> Optional[ObjectID]:
227 """Return the cached peeled value of a ref, if available.
229 Args:
230 name: Name of the ref to peel
231 Returns: The peeled value of the ref. If the ref is known not point to
232 a tag, this will be the SHA the ref refers to. If the ref may point
233 to a tag, but no cached information is available, None is returned.
234 """
235 return None
237 def import_refs(
238 self,
239 base: Ref,
240 other: dict[Ref, ObjectID],
241 committer: Optional[bytes] = None,
242 timestamp: Optional[bytes] = None,
243 timezone: Optional[bytes] = None,
244 message: Optional[bytes] = None,
245 prune: bool = False,
246 ) -> None:
247 """Import refs from another repository.
249 Args:
250 base: Base ref to import into (e.g., b'refs/remotes/origin')
251 other: Dictionary of refs to import
252 committer: Optional committer for reflog
253 timestamp: Optional timestamp for reflog
254 timezone: Optional timezone for reflog
255 message: Optional message for reflog
256 prune: If True, remove refs not in other
257 """
258 if prune:
259 to_delete = set(self.subkeys(base))
260 else:
261 to_delete = set()
262 for name, value in other.items():
263 if value is None:
264 to_delete.add(name)
265 else:
266 self.set_if_equals(
267 b"/".join((base, name)), None, value, message=message
268 )
269 if to_delete:
270 try:
271 to_delete.remove(name)
272 except KeyError:
273 pass
274 for ref in to_delete:
275 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
277 def allkeys(self) -> set[Ref]:
278 """All refs present in this container."""
279 raise NotImplementedError(self.allkeys)
281 def __iter__(self) -> Iterator[Ref]:
282 """Iterate over all reference keys."""
283 return iter(self.allkeys())
285 def keys(self, base=None):
286 """Refs present in this container.
288 Args:
289 base: An optional base to return refs under.
290 Returns: An unsorted set of valid refs in this container, including
291 packed refs.
292 """
293 if base is not None:
294 return self.subkeys(base)
295 else:
296 return self.allkeys()
298 def subkeys(self, base: bytes) -> set[bytes]:
299 """Refs present in this container under a base.
301 Args:
302 base: The base to return refs under.
303 Returns: A set of valid refs in this container under the base; the base
304 prefix is stripped from the ref names returned.
305 """
306 keys = set()
307 base_len = len(base) + 1
308 for refname in self.allkeys():
309 if refname.startswith(base):
310 keys.add(refname[base_len:])
311 return keys
313 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
314 """Return the contents of this container as a dictionary."""
315 ret = {}
316 keys = self.keys(base)
317 if base is None:
318 base = b""
319 else:
320 base = base.rstrip(b"/")
321 for key in keys:
322 try:
323 ret[key] = self[(base + b"/" + key).strip(b"/")]
324 except (SymrefLoop, KeyError):
325 continue # Unable to resolve
327 return ret
329 def _check_refname(self, name: bytes) -> None:
330 """Ensure a refname is valid and lives in refs or is HEAD.
332 HEAD is not a valid refname according to git-check-ref-format, but this
333 class needs to be able to touch HEAD. Also, check_ref_format expects
334 refnames without the leading 'refs/', but this class requires that
335 so it cannot touch anything outside the refs dir (or HEAD).
337 Args:
338 name: The name of the reference.
340 Raises:
341 KeyError: if a refname is not HEAD or is otherwise not valid.
342 """
343 if name in (HEADREF, b"refs/stash"):
344 return
345 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
346 raise RefFormatError(name)
348 def read_ref(self, refname: bytes) -> Optional[bytes]:
349 """Read a reference without following any references.
351 Args:
352 refname: The name of the reference
353 Returns: The contents of the ref file, or None if it does
354 not exist.
355 """
356 contents = self.read_loose_ref(refname)
357 if not contents:
358 contents = self.get_packed_refs().get(refname, None)
359 return contents
361 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
362 """Read a loose reference and return its contents.
364 Args:
365 name: the refname to read
366 Returns: The contents of the ref file, or None if it does
367 not exist.
368 """
369 raise NotImplementedError(self.read_loose_ref)
371 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
372 """Follow a reference name.
374 Returns: a tuple of (refnames, sha), wheres refnames are the names of
375 references in the chain
376 """
377 contents: Optional[bytes] = SYMREF + name
378 depth = 0
379 refnames = []
380 while contents and contents.startswith(SYMREF):
381 refname = contents[len(SYMREF) :]
382 refnames.append(refname)
383 contents = self.read_ref(refname)
384 if not contents:
385 break
386 depth += 1
387 if depth > 5:
388 raise SymrefLoop(name, depth)
389 return refnames, contents
391 def __contains__(self, refname: bytes) -> bool:
392 """Check if a reference exists."""
393 if self.read_ref(refname):
394 return True
395 return False
397 def __getitem__(self, name: bytes) -> ObjectID:
398 """Get the SHA1 for a reference name.
400 This method follows all symbolic references.
401 """
402 _, sha = self.follow(name)
403 if sha is None:
404 raise KeyError(name)
405 return sha
407 def set_if_equals(
408 self,
409 name: bytes,
410 old_ref: Optional[bytes],
411 new_ref: bytes,
412 committer: Optional[bytes] = None,
413 timestamp: Optional[int] = None,
414 timezone: Optional[int] = None,
415 message: Optional[bytes] = None,
416 ) -> bool:
417 """Set a refname to new_ref only if it currently equals old_ref.
419 This method follows all symbolic references if applicable for the
420 subclass, and can be used to perform an atomic compare-and-swap
421 operation.
423 Args:
424 name: The refname to set.
425 old_ref: The old sha the refname must refer to, or None to set
426 unconditionally.
427 new_ref: The new sha the refname will refer to.
428 committer: Optional committer name/email
429 timestamp: Optional timestamp
430 timezone: Optional timezone
431 message: Message for reflog
432 Returns: True if the set was successful, False otherwise.
433 """
434 raise NotImplementedError(self.set_if_equals)
436 def add_if_new(
437 self,
438 name: bytes,
439 ref: bytes,
440 committer: Optional[bytes] = None,
441 timestamp: Optional[int] = None,
442 timezone: Optional[int] = None,
443 message: Optional[bytes] = None,
444 ) -> bool:
445 """Add a new reference only if it does not already exist.
447 Args:
448 name: Ref name
449 ref: Ref value
450 committer: Optional committer name/email
451 timestamp: Optional timestamp
452 timezone: Optional timezone
453 message: Optional message for reflog
454 """
455 raise NotImplementedError(self.add_if_new)
457 def __setitem__(self, name: bytes, ref: bytes) -> None:
458 """Set a reference name to point to the given SHA1.
460 This method follows all symbolic references if applicable for the
461 subclass.
463 Note: This method unconditionally overwrites the contents of a
464 reference. To update atomically only if the reference has not
465 changed, use set_if_equals().
467 Args:
468 name: The refname to set.
469 ref: The new sha the refname will refer to.
470 """
471 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
472 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
473 self.set_if_equals(name, None, ref)
475 def remove_if_equals(
476 self,
477 name: bytes,
478 old_ref: Optional[bytes],
479 committer: Optional[bytes] = None,
480 timestamp: Optional[int] = None,
481 timezone: Optional[int] = None,
482 message: Optional[bytes] = None,
483 ) -> bool:
484 """Remove a refname only if it currently equals old_ref.
486 This method does not follow symbolic references, even if applicable for
487 the subclass. It can be used to perform an atomic compare-and-delete
488 operation.
490 Args:
491 name: The refname to delete.
492 old_ref: The old sha the refname must refer to, or None to
493 delete unconditionally.
494 committer: Optional committer name/email
495 timestamp: Optional timestamp
496 timezone: Optional timezone
497 message: Message for reflog
498 Returns: True if the delete was successful, False otherwise.
499 """
500 raise NotImplementedError(self.remove_if_equals)
502 def __delitem__(self, name: bytes) -> None:
503 """Remove a refname.
505 This method does not follow symbolic references, even if applicable for
506 the subclass.
508 Note: This method unconditionally deletes the contents of a reference.
509 To delete atomically only if the reference has not changed, use
510 remove_if_equals().
512 Args:
513 name: The refname to delete.
514 """
515 self.remove_if_equals(name, None)
517 def get_symrefs(self) -> dict[bytes, bytes]:
518 """Get a dict with all symrefs in this container.
520 Returns: Dictionary mapping source ref to target ref
521 """
522 ret = {}
523 for src in self.allkeys():
524 try:
525 ref_value = self.read_ref(src)
526 assert ref_value is not None
527 dst = parse_symref_value(ref_value)
528 except ValueError:
529 pass
530 else:
531 ret[src] = dst
532 return ret
534 def pack_refs(self, all: bool = False) -> None:
535 """Pack loose refs into packed-refs file.
537 Args:
538 all: If True, pack all refs. If False, only pack tags.
539 """
540 raise NotImplementedError(self.pack_refs)
543class DictRefsContainer(RefsContainer):
544 """RefsContainer backed by a simple dict.
546 This container does not support symbolic or packed references and is not
547 threadsafe.
548 """
550 def __init__(
551 self,
552 refs: dict[bytes, bytes],
553 logger: Optional[
554 Callable[
555 [
556 bytes,
557 Optional[bytes],
558 Optional[bytes],
559 Optional[bytes],
560 Optional[int],
561 Optional[int],
562 Optional[bytes],
563 ],
564 None,
565 ]
566 ] = None,
567 ) -> None:
568 """Initialize DictRefsContainer with refs dictionary and optional logger."""
569 super().__init__(logger=logger)
570 self._refs = refs
571 self._peeled: dict[bytes, ObjectID] = {}
572 self._watchers: set[Any] = set()
574 def allkeys(self) -> set[bytes]:
575 """Return all reference keys."""
576 return set(self._refs.keys())
578 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
579 """Read a loose reference."""
580 return self._refs.get(name, None)
582 def get_packed_refs(self) -> dict[bytes, bytes]:
583 """Get packed references."""
584 return {}
586 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
587 for watcher in self._watchers:
588 watcher._notify((ref, newsha))
590 def set_symbolic_ref(
591 self,
592 name: Ref,
593 other: Ref,
594 committer: Optional[bytes] = None,
595 timestamp: Optional[int] = None,
596 timezone: Optional[int] = None,
597 message: Optional[bytes] = None,
598 ) -> None:
599 """Make a ref point at another ref.
601 Args:
602 name: Name of the ref to set
603 other: Name of the ref to point at
604 committer: Optional committer name for reflog
605 timestamp: Optional timestamp for reflog
606 timezone: Optional timezone for reflog
607 message: Optional message for reflog
608 """
609 old = self.follow(name)[-1]
610 new = SYMREF + other
611 self._refs[name] = new
612 self._notify(name, new)
613 self._log(
614 name,
615 old,
616 new,
617 committer=committer,
618 timestamp=timestamp,
619 timezone=timezone,
620 message=message,
621 )
623 def set_if_equals(
624 self,
625 name: bytes,
626 old_ref: Optional[bytes],
627 new_ref: bytes,
628 committer: Optional[bytes] = None,
629 timestamp: Optional[int] = None,
630 timezone: Optional[int] = None,
631 message: Optional[bytes] = None,
632 ) -> bool:
633 """Set a refname to new_ref only if it currently equals old_ref.
635 This method follows all symbolic references, and can be used to perform
636 an atomic compare-and-swap operation.
638 Args:
639 name: The refname to set.
640 old_ref: The old sha the refname must refer to, or None to set
641 unconditionally.
642 new_ref: The new sha the refname will refer to.
643 committer: Optional committer name for reflog
644 timestamp: Optional timestamp for reflog
645 timezone: Optional timezone for reflog
646 message: Optional message for reflog
648 Returns:
649 True if the set was successful, False otherwise.
650 """
651 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
652 return False
653 # Only update the specific ref requested, not the whole chain
654 self._check_refname(name)
655 old = self._refs.get(name)
656 self._refs[name] = new_ref
657 self._notify(name, new_ref)
658 self._log(
659 name,
660 old,
661 new_ref,
662 committer=committer,
663 timestamp=timestamp,
664 timezone=timezone,
665 message=message,
666 )
667 return True
669 def add_if_new(
670 self,
671 name: Ref,
672 ref: ObjectID,
673 committer: Optional[bytes] = None,
674 timestamp: Optional[int] = None,
675 timezone: Optional[int] = None,
676 message: Optional[bytes] = None,
677 ) -> bool:
678 """Add a new reference only if it does not already exist.
680 Args:
681 name: Ref name
682 ref: Ref value
683 committer: Optional committer name for reflog
684 timestamp: Optional timestamp for reflog
685 timezone: Optional timezone for reflog
686 message: Optional message for reflog
688 Returns:
689 True if the add was successful, False otherwise.
690 """
691 if name in self._refs:
692 return False
693 self._refs[name] = ref
694 self._notify(name, ref)
695 self._log(
696 name,
697 None,
698 ref,
699 committer=committer,
700 timestamp=timestamp,
701 timezone=timezone,
702 message=message,
703 )
704 return True
706 def remove_if_equals(
707 self,
708 name: bytes,
709 old_ref: Optional[bytes],
710 committer: Optional[bytes] = None,
711 timestamp: Optional[int] = None,
712 timezone: Optional[int] = None,
713 message: Optional[bytes] = None,
714 ) -> bool:
715 """Remove a refname only if it currently equals old_ref.
717 This method does not follow symbolic references. It can be used to
718 perform an atomic compare-and-delete operation.
720 Args:
721 name: The refname to delete.
722 old_ref: The old sha the refname must refer to, or None to
723 delete unconditionally.
724 committer: Optional committer name for reflog
725 timestamp: Optional timestamp for reflog
726 timezone: Optional timezone for reflog
727 message: Optional message for reflog
729 Returns:
730 True if the delete was successful, False otherwise.
731 """
732 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
733 return False
734 try:
735 old = self._refs.pop(name)
736 except KeyError:
737 pass
738 else:
739 self._notify(name, None)
740 self._log(
741 name,
742 old,
743 None,
744 committer=committer,
745 timestamp=timestamp,
746 timezone=timezone,
747 message=message,
748 )
749 return True
751 def get_peeled(self, name: bytes) -> Optional[bytes]:
752 """Get peeled version of a reference."""
753 return self._peeled.get(name)
755 def _update(self, refs: dict[bytes, bytes]) -> None:
756 """Update multiple refs; intended only for testing."""
757 # TODO(dborowitz): replace this with a public function that uses
758 # set_if_equal.
759 for ref, sha in refs.items():
760 self.set_if_equals(ref, None, sha)
762 def _update_peeled(self, peeled: dict[bytes, bytes]) -> None:
763 """Update cached peeled refs; intended only for testing."""
764 self._peeled.update(peeled)
767class InfoRefsContainer(RefsContainer):
768 """Refs container that reads refs from a info/refs file."""
770 def __init__(self, f: BinaryIO) -> None:
771 """Initialize InfoRefsContainer from info/refs file."""
772 self._refs: dict[bytes, bytes] = {}
773 self._peeled: dict[bytes, bytes] = {}
774 refs = read_info_refs(f)
775 (self._refs, self._peeled) = split_peeled_refs(refs)
777 def allkeys(self) -> set[bytes]:
778 """Return all reference keys."""
779 return set(self._refs.keys())
781 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
782 """Read a loose reference."""
783 return self._refs.get(name, None)
785 def get_packed_refs(self) -> dict[bytes, bytes]:
786 """Get packed references."""
787 return {}
789 def get_peeled(self, name: bytes) -> Optional[bytes]:
790 """Get peeled version of a reference."""
791 try:
792 return self._peeled[name]
793 except KeyError:
794 return self._refs[name]
797class DiskRefsContainer(RefsContainer):
798 """Refs container that reads refs from disk."""
800 def __init__(
801 self,
802 path: Union[str, bytes, os.PathLike],
803 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
804 logger: Optional[
805 Callable[
806 [
807 bytes,
808 Optional[bytes],
809 Optional[bytes],
810 Optional[bytes],
811 Optional[int],
812 Optional[int],
813 Optional[bytes],
814 ],
815 None,
816 ]
817 ] = None,
818 ) -> None:
819 """Initialize DiskRefsContainer."""
820 super().__init__(logger=logger)
821 # Convert path-like objects to strings, then to bytes for Git compatibility
822 self.path = os.fsencode(os.fspath(path))
823 if worktree_path is None:
824 self.worktree_path = self.path
825 else:
826 self.worktree_path = os.fsencode(os.fspath(worktree_path))
827 self._packed_refs: Optional[dict[bytes, bytes]] = None
828 self._peeled_refs: Optional[dict[bytes, bytes]] = None
830 def __repr__(self) -> str:
831 """Return string representation of DiskRefsContainer."""
832 return f"{self.__class__.__name__}({self.path!r})"
834 def subkeys(self, base: bytes) -> set[bytes]:
835 """Return subkeys under a given base reference path."""
836 subkeys = set()
837 path = self.refpath(base)
838 for root, unused_dirs, files in os.walk(path):
839 directory = root[len(path) :]
840 if os.path.sep != "/":
841 directory = directory.replace(os.fsencode(os.path.sep), b"/")
842 directory = directory.strip(b"/")
843 for filename in files:
844 refname = b"/".join(([directory] if directory else []) + [filename])
845 # check_ref_format requires at least one /, so we prepend the
846 # base before calling it.
847 if check_ref_format(base + b"/" + refname):
848 subkeys.add(refname)
849 for key in self.get_packed_refs():
850 if key.startswith(base):
851 subkeys.add(key[len(base) :].strip(b"/"))
852 return subkeys
854 def allkeys(self) -> set[bytes]:
855 """Return all reference keys."""
856 allkeys = set()
857 if os.path.exists(self.refpath(HEADREF)):
858 allkeys.add(HEADREF)
859 path = self.refpath(b"")
860 refspath = self.refpath(b"refs")
861 for root, unused_dirs, files in os.walk(refspath):
862 directory = root[len(path) :]
863 if os.path.sep != "/":
864 directory = directory.replace(os.fsencode(os.path.sep), b"/")
865 for filename in files:
866 refname = b"/".join([directory, filename])
867 if check_ref_format(refname):
868 allkeys.add(refname)
869 allkeys.update(self.get_packed_refs())
870 return allkeys
872 def refpath(self, name: bytes) -> bytes:
873 """Return the disk path of a ref."""
874 if os.path.sep != "/":
875 name = name.replace(b"/", os.fsencode(os.path.sep))
876 # TODO: as the 'HEAD' reference is working tree specific, it
877 # should actually not be a part of RefsContainer
878 if name == HEADREF:
879 return os.path.join(self.worktree_path, name)
880 else:
881 return os.path.join(self.path, name)
883 def get_packed_refs(self) -> dict[bytes, bytes]:
884 """Get contents of the packed-refs file.
886 Returns: Dictionary mapping ref names to SHA1s
888 Note: Will return an empty dictionary when no packed-refs file is
889 present.
890 """
891 # TODO: invalidate the cache on repacking
892 if self._packed_refs is None:
893 # set both to empty because we want _peeled_refs to be
894 # None if and only if _packed_refs is also None.
895 self._packed_refs = {}
896 self._peeled_refs = {}
897 path = os.path.join(self.path, b"packed-refs")
898 try:
899 f = GitFile(path, "rb")
900 except FileNotFoundError:
901 return {}
902 with f:
903 first_line = next(iter(f)).rstrip()
904 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
905 for sha, name, peeled in read_packed_refs_with_peeled(f):
906 self._packed_refs[name] = sha
907 if peeled:
908 self._peeled_refs[name] = peeled
909 else:
910 f.seek(0)
911 for sha, name in read_packed_refs(f):
912 self._packed_refs[name] = sha
913 return self._packed_refs
915 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
916 """Add the given refs as packed refs.
918 Args:
919 new_refs: A mapping of ref names to targets; if a target is None that
920 means remove the ref
921 """
922 if not new_refs:
923 return
925 path = os.path.join(self.path, b"packed-refs")
927 with GitFile(path, "wb") as f:
928 # reread cached refs from disk, while holding the lock
929 packed_refs = self.get_packed_refs().copy()
931 for ref, target in new_refs.items():
932 # sanity check
933 if ref == HEADREF:
934 raise ValueError("cannot pack HEAD")
936 # remove any loose refs pointing to this one -- please
937 # note that this bypasses remove_if_equals as we don't
938 # want to affect packed refs in here
939 with suppress(OSError):
940 os.remove(self.refpath(ref))
942 if target is not None:
943 packed_refs[ref] = target
944 else:
945 packed_refs.pop(ref, None)
947 write_packed_refs(f, packed_refs, self._peeled_refs)
949 self._packed_refs = packed_refs
951 def get_peeled(self, name: bytes) -> Optional[bytes]:
952 """Return the cached peeled value of a ref, if available.
954 Args:
955 name: Name of the ref to peel
956 Returns: The peeled value of the ref. If the ref is known not point to
957 a tag, this will be the SHA the ref refers to. If the ref may point
958 to a tag, but no cached information is available, None is returned.
959 """
960 self.get_packed_refs()
961 if (
962 self._peeled_refs is None
963 or self._packed_refs is None
964 or name not in self._packed_refs
965 ):
966 # No cache: no peeled refs were read, or this ref is loose
967 return None
968 if name in self._peeled_refs:
969 return self._peeled_refs[name]
970 else:
971 # Known not peelable
972 return self[name]
974 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
975 """Read a reference file and return its contents.
977 If the reference file a symbolic reference, only read the first line of
978 the file. Otherwise, only read the first 40 bytes.
980 Args:
981 name: the refname to read, relative to refpath
982 Returns: The contents of the ref file, or None if the file does not
983 exist.
985 Raises:
986 IOError: if any other error occurs
987 """
988 filename = self.refpath(name)
989 try:
990 with GitFile(filename, "rb") as f:
991 header = f.read(len(SYMREF))
992 if header == SYMREF:
993 # Read only the first line
994 return header + next(iter(f)).rstrip(b"\r\n")
995 else:
996 # Read only the first 40 bytes
997 return header + f.read(40 - len(SYMREF))
998 except (OSError, UnicodeError):
999 # don't assume anything specific about the error; in
1000 # particular, invalid or forbidden paths can raise weird
1001 # errors depending on the specific operating system
1002 return None
1004 def _remove_packed_ref(self, name: bytes) -> None:
1005 if self._packed_refs is None:
1006 return
1007 filename = os.path.join(self.path, b"packed-refs")
1008 # reread cached refs from disk, while holding the lock
1009 f = GitFile(filename, "wb")
1010 try:
1011 self._packed_refs = None
1012 self.get_packed_refs()
1014 if self._packed_refs is None or name not in self._packed_refs:
1015 f.abort()
1016 return
1018 del self._packed_refs[name]
1019 if self._peeled_refs is not None:
1020 with suppress(KeyError):
1021 del self._peeled_refs[name]
1022 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1023 f.close()
1024 except BaseException:
1025 f.abort()
1026 raise
1028 def set_symbolic_ref(
1029 self,
1030 name: bytes,
1031 other: bytes,
1032 committer: Optional[bytes] = None,
1033 timestamp: Optional[int] = None,
1034 timezone: Optional[int] = None,
1035 message: Optional[bytes] = None,
1036 ) -> None:
1037 """Make a ref point at another ref.
1039 Args:
1040 name: Name of the ref to set
1041 other: Name of the ref to point at
1042 committer: Optional committer name
1043 timestamp: Optional timestamp
1044 timezone: Optional timezone
1045 message: Optional message to describe the change
1046 """
1047 self._check_refname(name)
1048 self._check_refname(other)
1049 filename = self.refpath(name)
1050 f = GitFile(filename, "wb")
1051 try:
1052 f.write(SYMREF + other + b"\n")
1053 sha = self.follow(name)[-1]
1054 self._log(
1055 name,
1056 sha,
1057 sha,
1058 committer=committer,
1059 timestamp=timestamp,
1060 timezone=timezone,
1061 message=message,
1062 )
1063 except BaseException:
1064 f.abort()
1065 raise
1066 else:
1067 f.close()
1069 def set_if_equals(
1070 self,
1071 name: bytes,
1072 old_ref: Optional[bytes],
1073 new_ref: bytes,
1074 committer: Optional[bytes] = None,
1075 timestamp: Optional[int] = None,
1076 timezone: Optional[int] = None,
1077 message: Optional[bytes] = None,
1078 ) -> bool:
1079 """Set a refname to new_ref only if it currently equals old_ref.
1081 This method follows all symbolic references, and can be used to perform
1082 an atomic compare-and-swap operation.
1084 Args:
1085 name: The refname to set.
1086 old_ref: The old sha the refname must refer to, or None to set
1087 unconditionally.
1088 new_ref: The new sha the refname will refer to.
1089 committer: Optional committer name
1090 timestamp: Optional timestamp
1091 timezone: Optional timezone
1092 message: Set message for reflog
1093 Returns: True if the set was successful, False otherwise.
1094 """
1095 self._check_refname(name)
1096 try:
1097 realnames, _ = self.follow(name)
1098 realname = realnames[-1]
1099 except (KeyError, IndexError, SymrefLoop):
1100 realname = name
1101 filename = self.refpath(realname)
1103 # make sure none of the ancestor folders is in packed refs
1104 probe_ref = os.path.dirname(realname)
1105 packed_refs = self.get_packed_refs()
1106 while probe_ref:
1107 if packed_refs.get(probe_ref, None) is not None:
1108 raise NotADirectoryError(filename)
1109 probe_ref = os.path.dirname(probe_ref)
1111 ensure_dir_exists(os.path.dirname(filename))
1112 with GitFile(filename, "wb") as f:
1113 if old_ref is not None:
1114 try:
1115 # read again while holding the lock to handle race conditions
1116 orig_ref = self.read_loose_ref(realname)
1117 if orig_ref is None:
1118 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1119 if orig_ref != old_ref:
1120 f.abort()
1121 return False
1122 except OSError:
1123 f.abort()
1124 raise
1126 # Check if ref already has the desired value while holding the lock
1127 # This avoids fsync when ref is unchanged but still detects lock conflicts
1128 current_ref = self.read_loose_ref(realname)
1129 if current_ref is None:
1130 current_ref = packed_refs.get(realname, None)
1132 if current_ref is not None and current_ref == new_ref:
1133 # Ref already has desired value, abort write to avoid fsync
1134 f.abort()
1135 return True
1137 try:
1138 f.write(new_ref + b"\n")
1139 except OSError:
1140 f.abort()
1141 raise
1142 self._log(
1143 realname,
1144 old_ref,
1145 new_ref,
1146 committer=committer,
1147 timestamp=timestamp,
1148 timezone=timezone,
1149 message=message,
1150 )
1151 return True
1153 def add_if_new(
1154 self,
1155 name: bytes,
1156 ref: bytes,
1157 committer: Optional[bytes] = None,
1158 timestamp: Optional[int] = None,
1159 timezone: Optional[int] = None,
1160 message: Optional[bytes] = None,
1161 ) -> bool:
1162 """Add a new reference only if it does not already exist.
1164 This method follows symrefs, and only ensures that the last ref in the
1165 chain does not exist.
1167 Args:
1168 name: The refname to set.
1169 ref: The new sha the refname will refer to.
1170 committer: Optional committer name
1171 timestamp: Optional timestamp
1172 timezone: Optional timezone
1173 message: Optional message for reflog
1174 Returns: True if the add was successful, False otherwise.
1175 """
1176 try:
1177 realnames, contents = self.follow(name)
1178 if contents is not None:
1179 return False
1180 realname = realnames[-1]
1181 except (KeyError, IndexError):
1182 realname = name
1183 self._check_refname(realname)
1184 filename = self.refpath(realname)
1185 ensure_dir_exists(os.path.dirname(filename))
1186 with GitFile(filename, "wb") as f:
1187 if os.path.exists(filename) or name in self.get_packed_refs():
1188 f.abort()
1189 return False
1190 try:
1191 f.write(ref + b"\n")
1192 except OSError:
1193 f.abort()
1194 raise
1195 else:
1196 self._log(
1197 name,
1198 None,
1199 ref,
1200 committer=committer,
1201 timestamp=timestamp,
1202 timezone=timezone,
1203 message=message,
1204 )
1205 return True
1207 def remove_if_equals(
1208 self,
1209 name: bytes,
1210 old_ref: Optional[bytes],
1211 committer: Optional[bytes] = None,
1212 timestamp: Optional[int] = None,
1213 timezone: Optional[int] = None,
1214 message: Optional[bytes] = None,
1215 ) -> bool:
1216 """Remove a refname only if it currently equals old_ref.
1218 This method does not follow symbolic references. It can be used to
1219 perform an atomic compare-and-delete operation.
1221 Args:
1222 name: The refname to delete.
1223 old_ref: The old sha the refname must refer to, or None to
1224 delete unconditionally.
1225 committer: Optional committer name
1226 timestamp: Optional timestamp
1227 timezone: Optional timezone
1228 message: Optional message
1229 Returns: True if the delete was successful, False otherwise.
1230 """
1231 self._check_refname(name)
1232 filename = self.refpath(name)
1233 ensure_dir_exists(os.path.dirname(filename))
1234 f = GitFile(filename, "wb")
1235 try:
1236 if old_ref is not None:
1237 orig_ref = self.read_loose_ref(name)
1238 if orig_ref is None:
1239 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1240 if orig_ref != old_ref:
1241 return False
1243 # remove the reference file itself
1244 try:
1245 found = os.path.lexists(filename)
1246 except OSError:
1247 # may only be packed, or otherwise unstorable
1248 found = False
1250 if found:
1251 os.remove(filename)
1253 self._remove_packed_ref(name)
1254 self._log(
1255 name,
1256 old_ref,
1257 None,
1258 committer=committer,
1259 timestamp=timestamp,
1260 timezone=timezone,
1261 message=message,
1262 )
1263 finally:
1264 # never write, we just wanted the lock
1265 f.abort()
1267 # outside of the lock, clean-up any parent directory that might now
1268 # be empty. this ensures that re-creating a reference of the same
1269 # name of what was previously a directory works as expected
1270 parent = name
1271 while True:
1272 try:
1273 parent, _ = parent.rsplit(b"/", 1)
1274 except ValueError:
1275 break
1277 if parent == b"refs":
1278 break
1279 parent_filename = self.refpath(parent)
1280 try:
1281 os.rmdir(parent_filename)
1282 except OSError:
1283 # this can be caused by the parent directory being
1284 # removed by another process, being not empty, etc.
1285 # in any case, this is non fatal because we already
1286 # removed the reference, just ignore it
1287 break
1289 return True
1291 def pack_refs(self, all: bool = False) -> None:
1292 """Pack loose refs into packed-refs file.
1294 Args:
1295 all: If True, pack all refs. If False, only pack tags.
1296 """
1297 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1298 for ref in self.allkeys():
1299 if ref == HEADREF:
1300 # Never pack HEAD
1301 continue
1302 if all or ref.startswith(LOCAL_TAG_PREFIX):
1303 try:
1304 sha = self[ref]
1305 if sha:
1306 refs_to_pack[ref] = sha
1307 except KeyError:
1308 # Broken ref, skip it
1309 pass
1311 if refs_to_pack:
1312 self.add_packed_refs(refs_to_pack)
1315def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
1316 """Split a single ref line into a tuple of SHA1 and name."""
1317 fields = line.rstrip(b"\n\r").split(b" ")
1318 if len(fields) != 2:
1319 raise PackedRefsException(f"invalid ref line {line!r}")
1320 sha, name = fields
1321 if not valid_hexsha(sha):
1322 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1323 if not check_ref_format(name):
1324 raise PackedRefsException(f"invalid ref name {name!r}")
1325 return (sha, name)
1328def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
1329 """Read a packed refs file.
1331 Args:
1332 f: file-like object to read from
1333 Returns: Iterator over tuples with SHA1s and ref names.
1334 """
1335 for line in f:
1336 if line.startswith(b"#"):
1337 # Comment
1338 continue
1339 if line.startswith(b"^"):
1340 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1341 yield _split_ref_line(line)
1344def read_packed_refs_with_peeled(
1345 f: IO[bytes],
1346) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
1347 """Read a packed refs file including peeled refs.
1349 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1350 with ref names, SHA1s, and peeled SHA1s (or None).
1352 Args:
1353 f: file-like object to read from, seek'ed to the second line
1354 """
1355 last = None
1356 for line in f:
1357 if line[0] == b"#":
1358 continue
1359 line = line.rstrip(b"\r\n")
1360 if line.startswith(b"^"):
1361 if not last:
1362 raise PackedRefsException("unexpected peeled ref line")
1363 if not valid_hexsha(line[1:]):
1364 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1365 sha, name = _split_ref_line(last)
1366 last = None
1367 yield (sha, name, line[1:])
1368 else:
1369 if last:
1370 sha, name = _split_ref_line(last)
1371 yield (sha, name, None)
1372 last = line
1373 if last:
1374 sha, name = _split_ref_line(last)
1375 yield (sha, name, None)
1378def write_packed_refs(
1379 f: IO[bytes],
1380 packed_refs: dict[bytes, bytes],
1381 peeled_refs: Optional[dict[bytes, bytes]] = None,
1382) -> None:
1383 """Write a packed refs file.
1385 Args:
1386 f: empty file-like object to write to
1387 packed_refs: dict of refname to sha of packed refs to write
1388 peeled_refs: dict of refname to peeled value of sha
1389 """
1390 if peeled_refs is None:
1391 peeled_refs = {}
1392 else:
1393 f.write(b"# pack-refs with: peeled\n")
1394 for refname in sorted(packed_refs.keys()):
1395 f.write(git_line(packed_refs[refname], refname))
1396 if refname in peeled_refs:
1397 f.write(b"^" + peeled_refs[refname] + b"\n")
1400def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
1401 """Read info/refs file.
1403 Args:
1404 f: File-like object to read from
1406 Returns:
1407 Dictionary mapping ref names to SHA1s
1408 """
1409 ret = {}
1410 for line in f.readlines():
1411 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1412 ret[name] = sha
1413 return ret
1416def write_info_refs(
1417 refs: dict[bytes, bytes], store: ObjectContainer
1418) -> Iterator[bytes]:
1419 """Generate info refs."""
1420 # TODO: Avoid recursive import :(
1421 from .object_store import peel_sha
1423 for name, sha in sorted(refs.items()):
1424 # get_refs() includes HEAD as a special case, but we don't want to
1425 # advertise it
1426 if name == HEADREF:
1427 continue
1428 try:
1429 o = store[sha]
1430 except KeyError:
1431 continue
1432 unpeeled, peeled = peel_sha(store, sha)
1433 yield o.id + b"\t" + name + b"\n"
1434 if o.id != peeled.id:
1435 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1438def is_local_branch(x: bytes) -> bool:
1439 """Check if a ref name is a local branch."""
1440 return x.startswith(LOCAL_BRANCH_PREFIX)
1443T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
1446def strip_peeled_refs(refs: T) -> T:
1447 """Remove all peeled refs."""
1448 return {
1449 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1450 }
1453def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
1454 """Split peeled refs from regular refs."""
1455 peeled: dict[bytes, bytes] = {}
1456 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
1458 for ref, sha in refs.items():
1459 if ref.endswith(PEELED_TAG_SUFFIX):
1460 # Only add to peeled dict if sha is not None
1461 if sha is not None:
1462 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1464 return regular, peeled
1467def _set_origin_head(
1468 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
1469) -> None:
1470 # set refs/remotes/origin/HEAD
1471 origin_base = b"refs/remotes/" + origin + b"/"
1472 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1473 origin_ref = origin_base + HEADREF
1474 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1475 if target_ref in refs:
1476 refs.set_symbolic_ref(origin_ref, target_ref)
1479def _set_default_branch(
1480 refs: RefsContainer,
1481 origin: bytes,
1482 origin_head: Optional[bytes],
1483 branch: bytes,
1484 ref_message: Optional[bytes],
1485) -> bytes:
1486 """Set the default branch."""
1487 origin_base = b"refs/remotes/" + origin + b"/"
1488 if branch:
1489 origin_ref = origin_base + branch
1490 if origin_ref in refs:
1491 local_ref = LOCAL_BRANCH_PREFIX + branch
1492 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1493 head_ref = local_ref
1494 elif LOCAL_TAG_PREFIX + branch in refs:
1495 head_ref = LOCAL_TAG_PREFIX + branch
1496 else:
1497 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1498 elif origin_head:
1499 head_ref = origin_head
1500 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1501 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1502 else:
1503 origin_ref = origin_head
1504 try:
1505 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1506 except KeyError:
1507 pass
1508 else:
1509 raise ValueError("neither origin_head nor branch are provided")
1510 return head_ref
1513def _set_head(
1514 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
1515) -> Optional[bytes]:
1516 if head_ref.startswith(LOCAL_TAG_PREFIX):
1517 # detach HEAD at specified tag
1518 head = refs[head_ref]
1519 if isinstance(head, Tag):
1520 _cls, obj = head.object
1521 head = obj.get_object(obj).id
1522 del refs[HEADREF]
1523 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1524 else:
1525 # set HEAD to specific branch
1526 try:
1527 head = refs[head_ref]
1528 refs.set_symbolic_ref(HEADREF, head_ref)
1529 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1530 except KeyError:
1531 head = None
1532 return head
1535def _import_remote_refs(
1536 refs_container: RefsContainer,
1537 remote_name: str,
1538 refs: dict[bytes, Optional[bytes]],
1539 message: Optional[bytes] = None,
1540 prune: bool = False,
1541 prune_tags: bool = False,
1542) -> None:
1543 stripped_refs = strip_peeled_refs(refs)
1544 branches = {
1545 n[len(LOCAL_BRANCH_PREFIX) :]: v
1546 for (n, v) in stripped_refs.items()
1547 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
1548 }
1549 refs_container.import_refs(
1550 b"refs/remotes/" + remote_name.encode(),
1551 branches,
1552 message=message,
1553 prune=prune,
1554 )
1555 tags = {
1556 n[len(LOCAL_TAG_PREFIX) :]: v
1557 for (n, v) in stripped_refs.items()
1558 if n.startswith(LOCAL_TAG_PREFIX)
1559 and not n.endswith(PEELED_TAG_SUFFIX)
1560 and v is not None
1561 }
1562 refs_container.import_refs(
1563 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1564 )
1567def serialize_refs(
1568 store: ObjectContainer, refs: dict[bytes, bytes]
1569) -> dict[bytes, bytes]:
1570 """Serialize refs with peeled refs.
1572 Args:
1573 store: Object store to peel refs from
1574 refs: Dictionary of ref names to SHAs
1576 Returns:
1577 Dictionary with refs and peeled refs (marked with ^{})
1578 """
1579 # TODO: Avoid recursive import :(
1580 from .object_store import peel_sha
1582 ret = {}
1583 for ref, sha in refs.items():
1584 try:
1585 unpeeled, peeled = peel_sha(store, sha)
1586 except KeyError:
1587 warnings.warn(
1588 "ref {} points at non-present sha {}".format(
1589 ref.decode("utf-8", "replace"), sha.decode("ascii")
1590 ),
1591 UserWarning,
1592 )
1593 continue
1594 else:
1595 if isinstance(unpeeled, Tag):
1596 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1597 ret[ref] = unpeeled.id
1598 return ret
1601class locked_ref:
1602 """Lock a ref while making modifications.
1604 Works as a context manager.
1605 """
1607 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1608 """Initialize a locked ref.
1610 Args:
1611 refs_container: The DiskRefsContainer to lock the ref in
1612 refname: The ref name to lock
1613 """
1614 self._refs_container = refs_container
1615 self._refname = refname
1616 self._file: Optional[_GitFile] = None
1617 self._realname: Optional[Ref] = None
1618 self._deleted = False
1620 def __enter__(self) -> "locked_ref":
1621 """Enter the context manager and acquire the lock.
1623 Returns:
1624 This locked_ref instance
1626 Raises:
1627 OSError: If the lock cannot be acquired
1628 """
1629 self._refs_container._check_refname(self._refname)
1630 try:
1631 realnames, _ = self._refs_container.follow(self._refname)
1632 self._realname = realnames[-1]
1633 except (KeyError, IndexError, SymrefLoop):
1634 self._realname = self._refname
1636 filename = self._refs_container.refpath(self._realname)
1637 ensure_dir_exists(os.path.dirname(filename))
1638 f = GitFile(filename, "wb")
1639 self._file = f
1640 return self
1642 def __exit__(
1643 self,
1644 exc_type: Optional[type],
1645 exc_value: Optional[BaseException],
1646 traceback: Optional[types.TracebackType],
1647 ) -> None:
1648 """Exit the context manager and release the lock.
1650 Args:
1651 exc_type: Type of exception if one occurred
1652 exc_value: Exception instance if one occurred
1653 traceback: Traceback if an exception occurred
1654 """
1655 if self._file:
1656 if exc_type is not None or self._deleted:
1657 self._file.abort()
1658 else:
1659 self._file.close()
1661 def get(self) -> Optional[bytes]:
1662 """Get the current value of the ref."""
1663 if not self._file:
1664 raise RuntimeError("locked_ref not in context")
1666 assert self._realname is not None
1667 current_ref = self._refs_container.read_loose_ref(self._realname)
1668 if current_ref is None:
1669 current_ref = self._refs_container.get_packed_refs().get(
1670 self._realname, None
1671 )
1672 return current_ref
1674 def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
1675 """Ensure the ref currently equals the expected value.
1677 Args:
1678 expected_value: The expected current value of the ref
1679 Returns:
1680 True if the ref equals the expected value, False otherwise
1681 """
1682 current_value = self.get()
1683 return current_value == expected_value
1685 def set(self, new_ref: bytes) -> None:
1686 """Set the ref to a new value.
1688 Args:
1689 new_ref: The new SHA1 or symbolic ref value
1690 """
1691 if not self._file:
1692 raise RuntimeError("locked_ref not in context")
1694 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1695 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1697 self._file.seek(0)
1698 self._file.truncate()
1699 self._file.write(new_ref + b"\n")
1700 self._deleted = False
1702 def set_symbolic_ref(self, target: Ref) -> None:
1703 """Make this ref point at another ref.
1705 Args:
1706 target: Name of the ref to point at
1707 """
1708 if not self._file:
1709 raise RuntimeError("locked_ref not in context")
1711 self._refs_container._check_refname(target)
1712 self._file.seek(0)
1713 self._file.truncate()
1714 self._file.write(SYMREF + target + b"\n")
1715 self._deleted = False
1717 def delete(self) -> None:
1718 """Delete the ref file while holding the lock."""
1719 if not self._file:
1720 raise RuntimeError("locked_ref not in context")
1722 # Delete the actual ref file while holding the lock
1723 if self._realname:
1724 filename = self._refs_container.refpath(self._realname)
1725 try:
1726 if os.path.lexists(filename):
1727 os.remove(filename)
1728 except FileNotFoundError:
1729 pass
1730 self._refs_container._remove_packed_ref(self._realname)
1732 self._deleted = True
1735def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
1736 """Filter refs to only include those with a given prefix.
1738 Args:
1739 refs: A dictionary of refs.
1740 prefixes: The prefixes to filter by.
1741 """
1742 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
1743 return cast(T, filtered)