Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import types
27import warnings
28from collections.abc import Iterable, Iterator, Mapping
29from contextlib import suppress
30from typing import (
31 IO,
32 TYPE_CHECKING,
33 Any,
34 BinaryIO,
35 Callable,
36 Optional,
37 TypeVar,
38 Union,
39)
41if TYPE_CHECKING:
42 from .file import _GitFile
44from .errors import PackedRefsException, RefFormatError
45from .file import GitFile, ensure_dir_exists
46from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
47from .pack import ObjectContainer
49Ref = bytes
51HEADREF = b"HEAD"
52SYMREF = b"ref: "
53LOCAL_BRANCH_PREFIX = b"refs/heads/"
54LOCAL_TAG_PREFIX = b"refs/tags/"
55LOCAL_REMOTE_PREFIX = b"refs/remotes/"
56LOCAL_NOTES_PREFIX = b"refs/notes/"
57LOCAL_REPLACE_PREFIX = b"refs/replace/"
58BAD_REF_CHARS = set(b"\177 ~^:?*[")
59PEELED_TAG_SUFFIX = b"^{}"
61# For backwards compatibility
62ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
65class SymrefLoop(Exception):
66 """There is a loop between one or more symrefs."""
68 def __init__(self, ref: bytes, depth: int) -> None:
69 """Initialize SymrefLoop exception."""
70 self.ref = ref
71 self.depth = depth
74def parse_symref_value(contents: bytes) -> bytes:
75 """Parse a symref value.
77 Args:
78 contents: Contents to parse
79 Returns: Destination
80 """
81 if contents.startswith(SYMREF):
82 return contents[len(SYMREF) :].rstrip(b"\r\n")
83 raise ValueError(contents)
86def check_ref_format(refname: Ref) -> bool:
87 """Check if a refname is correctly formatted.
89 Implements all the same rules as git-check-ref-format[1].
91 [1]
92 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
94 Args:
95 refname: The refname to check
96 Returns: True if refname is valid, False otherwise
97 """
98 # These could be combined into one big expression, but are listed
99 # separately to parallel [1].
100 if b"/." in refname or refname.startswith(b"."):
101 return False
102 if b"/" not in refname:
103 return False
104 if b".." in refname:
105 return False
106 for i, c in enumerate(refname):
107 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
108 return False
109 if refname[-1] in b"/.":
110 return False
111 if refname.endswith(b".lock"):
112 return False
113 if b"@{" in refname:
114 return False
115 if b"\\" in refname:
116 return False
117 return True
120def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
121 """Parse a remote ref into remote name and branch name.
123 Args:
124 ref: Remote ref like b"refs/remotes/origin/main"
126 Returns:
127 Tuple of (remote_name, branch_name)
129 Raises:
130 ValueError: If ref is not a valid remote ref
131 """
132 if not ref.startswith(LOCAL_REMOTE_PREFIX):
133 raise ValueError(f"Not a remote ref: {ref!r}")
135 # Remove the prefix
136 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
138 # Split into remote name and branch name
139 parts = remainder.split(b"/", 1)
140 if len(parts) != 2:
141 raise ValueError(f"Invalid remote ref format: {ref!r}")
143 remote_name, branch_name = parts
144 return (remote_name, branch_name)
147class RefsContainer:
148 """A container for refs."""
150 def __init__(
151 self,
152 logger: Optional[
153 Callable[
154 [
155 bytes,
156 bytes,
157 bytes,
158 Optional[bytes],
159 Optional[int],
160 Optional[int],
161 bytes,
162 ],
163 None,
164 ]
165 ] = None,
166 ) -> None:
167 """Initialize RefsContainer with optional logger function."""
168 self._logger = logger
170 def _log(
171 self,
172 ref: bytes,
173 old_sha: Optional[bytes],
174 new_sha: Optional[bytes],
175 committer: Optional[bytes] = None,
176 timestamp: Optional[int] = None,
177 timezone: Optional[int] = None,
178 message: Optional[bytes] = None,
179 ) -> None:
180 if self._logger is None:
181 return
182 if message is None:
183 return
184 # Use ZERO_SHA for None values, matching git behavior
185 if old_sha is None:
186 old_sha = ZERO_SHA
187 if new_sha is None:
188 new_sha = ZERO_SHA
189 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
191 def set_symbolic_ref(
192 self,
193 name: bytes,
194 other: bytes,
195 committer: Optional[bytes] = None,
196 timestamp: Optional[int] = None,
197 timezone: Optional[int] = None,
198 message: Optional[bytes] = None,
199 ) -> None:
200 """Make a ref point at another ref.
202 Args:
203 name: Name of the ref to set
204 other: Name of the ref to point at
205 committer: Optional committer name/email
206 timestamp: Optional timestamp
207 timezone: Optional timezone
208 message: Optional message
209 """
210 raise NotImplementedError(self.set_symbolic_ref)
212 def get_packed_refs(self) -> dict[Ref, ObjectID]:
213 """Get contents of the packed-refs file.
215 Returns: Dictionary mapping ref names to SHA1s
217 Note: Will return an empty dictionary when no packed-refs file is
218 present.
219 """
220 raise NotImplementedError(self.get_packed_refs)
222 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
223 """Add the given refs as packed refs.
225 Args:
226 new_refs: A mapping of ref names to targets; if a target is None that
227 means remove the ref
228 """
229 raise NotImplementedError(self.add_packed_refs)
231 def get_peeled(self, name: bytes) -> Optional[ObjectID]:
232 """Return the cached peeled value of a ref, if available.
234 Args:
235 name: Name of the ref to peel
236 Returns: The peeled value of the ref. If the ref is known not point to
237 a tag, this will be the SHA the ref refers to. If the ref may point
238 to a tag, but no cached information is available, None is returned.
239 """
240 return None
242 def import_refs(
243 self,
244 base: Ref,
245 other: Mapping[Ref, ObjectID],
246 committer: Optional[bytes] = None,
247 timestamp: Optional[bytes] = None,
248 timezone: Optional[bytes] = None,
249 message: Optional[bytes] = None,
250 prune: bool = False,
251 ) -> None:
252 """Import refs from another repository.
254 Args:
255 base: Base ref to import into (e.g., b'refs/remotes/origin')
256 other: Dictionary of refs to import
257 committer: Optional committer for reflog
258 timestamp: Optional timestamp for reflog
259 timezone: Optional timezone for reflog
260 message: Optional message for reflog
261 prune: If True, remove refs not in other
262 """
263 if prune:
264 to_delete = set(self.subkeys(base))
265 else:
266 to_delete = set()
267 for name, value in other.items():
268 if value is None:
269 to_delete.add(name)
270 else:
271 self.set_if_equals(
272 b"/".join((base, name)), None, value, message=message
273 )
274 if to_delete:
275 try:
276 to_delete.remove(name)
277 except KeyError:
278 pass
279 for ref in to_delete:
280 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
282 def allkeys(self) -> set[Ref]:
283 """All refs present in this container."""
284 raise NotImplementedError(self.allkeys)
286 def __iter__(self) -> Iterator[Ref]:
287 """Iterate over all reference keys."""
288 return iter(self.allkeys())
290 def keys(self, base: Optional[bytes] = None) -> set[bytes]:
291 """Refs present in this container.
293 Args:
294 base: An optional base to return refs under.
295 Returns: An unsorted set of valid refs in this container, including
296 packed refs.
297 """
298 if base is not None:
299 return self.subkeys(base)
300 else:
301 return self.allkeys()
303 def subkeys(self, base: bytes) -> set[bytes]:
304 """Refs present in this container under a base.
306 Args:
307 base: The base to return refs under.
308 Returns: A set of valid refs in this container under the base; the base
309 prefix is stripped from the ref names returned.
310 """
311 keys = set()
312 base_len = len(base) + 1
313 for refname in self.allkeys():
314 if refname.startswith(base):
315 keys.add(refname[base_len:])
316 return keys
318 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
319 """Return the contents of this container as a dictionary."""
320 ret = {}
321 keys = self.keys(base)
322 if base is None:
323 base = b""
324 else:
325 base = base.rstrip(b"/")
326 for key in keys:
327 try:
328 ret[key] = self[(base + b"/" + key).strip(b"/")]
329 except (SymrefLoop, KeyError):
330 continue # Unable to resolve
332 return ret
334 def _check_refname(self, name: bytes) -> None:
335 """Ensure a refname is valid and lives in refs or is HEAD.
337 HEAD is not a valid refname according to git-check-ref-format, but this
338 class needs to be able to touch HEAD. Also, check_ref_format expects
339 refnames without the leading 'refs/', but this class requires that
340 so it cannot touch anything outside the refs dir (or HEAD).
342 Args:
343 name: The name of the reference.
345 Raises:
346 KeyError: if a refname is not HEAD or is otherwise not valid.
347 """
348 if name in (HEADREF, b"refs/stash"):
349 return
350 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
351 raise RefFormatError(name)
353 def read_ref(self, refname: bytes) -> Optional[bytes]:
354 """Read a reference without following any references.
356 Args:
357 refname: The name of the reference
358 Returns: The contents of the ref file, or None if it does
359 not exist.
360 """
361 contents = self.read_loose_ref(refname)
362 if not contents:
363 contents = self.get_packed_refs().get(refname, None)
364 return contents
366 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
367 """Read a loose reference and return its contents.
369 Args:
370 name: the refname to read
371 Returns: The contents of the ref file, or None if it does
372 not exist.
373 """
374 raise NotImplementedError(self.read_loose_ref)
376 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
377 """Follow a reference name.
379 Returns: a tuple of (refnames, sha), wheres refnames are the names of
380 references in the chain
381 """
382 contents: Optional[bytes] = SYMREF + name
383 depth = 0
384 refnames = []
385 while contents and contents.startswith(SYMREF):
386 refname = contents[len(SYMREF) :]
387 refnames.append(refname)
388 contents = self.read_ref(refname)
389 if not contents:
390 break
391 depth += 1
392 if depth > 5:
393 raise SymrefLoop(name, depth)
394 return refnames, contents
396 def __contains__(self, refname: bytes) -> bool:
397 """Check if a reference exists."""
398 if self.read_ref(refname):
399 return True
400 return False
402 def __getitem__(self, name: bytes) -> ObjectID:
403 """Get the SHA1 for a reference name.
405 This method follows all symbolic references.
406 """
407 _, sha = self.follow(name)
408 if sha is None:
409 raise KeyError(name)
410 return sha
412 def set_if_equals(
413 self,
414 name: bytes,
415 old_ref: Optional[bytes],
416 new_ref: bytes,
417 committer: Optional[bytes] = None,
418 timestamp: Optional[int] = None,
419 timezone: Optional[int] = None,
420 message: Optional[bytes] = None,
421 ) -> bool:
422 """Set a refname to new_ref only if it currently equals old_ref.
424 This method follows all symbolic references if applicable for the
425 subclass, and can be used to perform an atomic compare-and-swap
426 operation.
428 Args:
429 name: The refname to set.
430 old_ref: The old sha the refname must refer to, or None to set
431 unconditionally.
432 new_ref: The new sha the refname will refer to.
433 committer: Optional committer name/email
434 timestamp: Optional timestamp
435 timezone: Optional timezone
436 message: Message for reflog
437 Returns: True if the set was successful, False otherwise.
438 """
439 raise NotImplementedError(self.set_if_equals)
441 def add_if_new(
442 self,
443 name: bytes,
444 ref: bytes,
445 committer: Optional[bytes] = None,
446 timestamp: Optional[int] = None,
447 timezone: Optional[int] = None,
448 message: Optional[bytes] = None,
449 ) -> bool:
450 """Add a new reference only if it does not already exist.
452 Args:
453 name: Ref name
454 ref: Ref value
455 committer: Optional committer name/email
456 timestamp: Optional timestamp
457 timezone: Optional timezone
458 message: Optional message for reflog
459 """
460 raise NotImplementedError(self.add_if_new)
462 def __setitem__(self, name: bytes, ref: bytes) -> None:
463 """Set a reference name to point to the given SHA1.
465 This method follows all symbolic references if applicable for the
466 subclass.
468 Note: This method unconditionally overwrites the contents of a
469 reference. To update atomically only if the reference has not
470 changed, use set_if_equals().
472 Args:
473 name: The refname to set.
474 ref: The new sha the refname will refer to.
475 """
476 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
477 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
478 self.set_if_equals(name, None, ref)
480 def remove_if_equals(
481 self,
482 name: bytes,
483 old_ref: Optional[bytes],
484 committer: Optional[bytes] = None,
485 timestamp: Optional[int] = None,
486 timezone: Optional[int] = None,
487 message: Optional[bytes] = None,
488 ) -> bool:
489 """Remove a refname only if it currently equals old_ref.
491 This method does not follow symbolic references, even if applicable for
492 the subclass. It can be used to perform an atomic compare-and-delete
493 operation.
495 Args:
496 name: The refname to delete.
497 old_ref: The old sha the refname must refer to, or None to
498 delete unconditionally.
499 committer: Optional committer name/email
500 timestamp: Optional timestamp
501 timezone: Optional timezone
502 message: Message for reflog
503 Returns: True if the delete was successful, False otherwise.
504 """
505 raise NotImplementedError(self.remove_if_equals)
507 def __delitem__(self, name: bytes) -> None:
508 """Remove a refname.
510 This method does not follow symbolic references, even if applicable for
511 the subclass.
513 Note: This method unconditionally deletes the contents of a reference.
514 To delete atomically only if the reference has not changed, use
515 remove_if_equals().
517 Args:
518 name: The refname to delete.
519 """
520 self.remove_if_equals(name, None)
522 def get_symrefs(self) -> dict[bytes, bytes]:
523 """Get a dict with all symrefs in this container.
525 Returns: Dictionary mapping source ref to target ref
526 """
527 ret = {}
528 for src in self.allkeys():
529 try:
530 ref_value = self.read_ref(src)
531 assert ref_value is not None
532 dst = parse_symref_value(ref_value)
533 except ValueError:
534 pass
535 else:
536 ret[src] = dst
537 return ret
539 def pack_refs(self, all: bool = False) -> None:
540 """Pack loose refs into packed-refs file.
542 Args:
543 all: If True, pack all refs. If False, only pack tags.
544 """
545 raise NotImplementedError(self.pack_refs)
548class DictRefsContainer(RefsContainer):
549 """RefsContainer backed by a simple dict.
551 This container does not support symbolic or packed references and is not
552 threadsafe.
553 """
555 def __init__(
556 self,
557 refs: dict[bytes, bytes],
558 logger: Optional[
559 Callable[
560 [
561 bytes,
562 Optional[bytes],
563 Optional[bytes],
564 Optional[bytes],
565 Optional[int],
566 Optional[int],
567 Optional[bytes],
568 ],
569 None,
570 ]
571 ] = None,
572 ) -> None:
573 """Initialize DictRefsContainer with refs dictionary and optional logger."""
574 super().__init__(logger=logger)
575 self._refs = refs
576 self._peeled: dict[bytes, ObjectID] = {}
577 self._watchers: set[Any] = set()
579 def allkeys(self) -> set[bytes]:
580 """Return all reference keys."""
581 return set(self._refs.keys())
583 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
584 """Read a loose reference."""
585 return self._refs.get(name, None)
587 def get_packed_refs(self) -> dict[bytes, bytes]:
588 """Get packed references."""
589 return {}
591 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
592 for watcher in self._watchers:
593 watcher._notify((ref, newsha))
595 def set_symbolic_ref(
596 self,
597 name: Ref,
598 other: Ref,
599 committer: Optional[bytes] = None,
600 timestamp: Optional[int] = None,
601 timezone: Optional[int] = None,
602 message: Optional[bytes] = None,
603 ) -> None:
604 """Make a ref point at another ref.
606 Args:
607 name: Name of the ref to set
608 other: Name of the ref to point at
609 committer: Optional committer name for reflog
610 timestamp: Optional timestamp for reflog
611 timezone: Optional timezone for reflog
612 message: Optional message for reflog
613 """
614 old = self.follow(name)[-1]
615 new = SYMREF + other
616 self._refs[name] = new
617 self._notify(name, new)
618 self._log(
619 name,
620 old,
621 new,
622 committer=committer,
623 timestamp=timestamp,
624 timezone=timezone,
625 message=message,
626 )
628 def set_if_equals(
629 self,
630 name: bytes,
631 old_ref: Optional[bytes],
632 new_ref: bytes,
633 committer: Optional[bytes] = None,
634 timestamp: Optional[int] = None,
635 timezone: Optional[int] = None,
636 message: Optional[bytes] = None,
637 ) -> bool:
638 """Set a refname to new_ref only if it currently equals old_ref.
640 This method follows all symbolic references, and can be used to perform
641 an atomic compare-and-swap operation.
643 Args:
644 name: The refname to set.
645 old_ref: The old sha the refname must refer to, or None to set
646 unconditionally.
647 new_ref: The new sha the refname will refer to.
648 committer: Optional committer name for reflog
649 timestamp: Optional timestamp for reflog
650 timezone: Optional timezone for reflog
651 message: Optional message for reflog
653 Returns:
654 True if the set was successful, False otherwise.
655 """
656 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
657 return False
658 # Only update the specific ref requested, not the whole chain
659 self._check_refname(name)
660 old = self._refs.get(name)
661 self._refs[name] = new_ref
662 self._notify(name, new_ref)
663 self._log(
664 name,
665 old,
666 new_ref,
667 committer=committer,
668 timestamp=timestamp,
669 timezone=timezone,
670 message=message,
671 )
672 return True
674 def add_if_new(
675 self,
676 name: Ref,
677 ref: ObjectID,
678 committer: Optional[bytes] = None,
679 timestamp: Optional[int] = None,
680 timezone: Optional[int] = None,
681 message: Optional[bytes] = None,
682 ) -> bool:
683 """Add a new reference only if it does not already exist.
685 Args:
686 name: Ref name
687 ref: Ref value
688 committer: Optional committer name for reflog
689 timestamp: Optional timestamp for reflog
690 timezone: Optional timezone for reflog
691 message: Optional message for reflog
693 Returns:
694 True if the add was successful, False otherwise.
695 """
696 if name in self._refs:
697 return False
698 self._refs[name] = ref
699 self._notify(name, ref)
700 self._log(
701 name,
702 None,
703 ref,
704 committer=committer,
705 timestamp=timestamp,
706 timezone=timezone,
707 message=message,
708 )
709 return True
711 def remove_if_equals(
712 self,
713 name: bytes,
714 old_ref: Optional[bytes],
715 committer: Optional[bytes] = None,
716 timestamp: Optional[int] = None,
717 timezone: Optional[int] = None,
718 message: Optional[bytes] = None,
719 ) -> bool:
720 """Remove a refname only if it currently equals old_ref.
722 This method does not follow symbolic references. It can be used to
723 perform an atomic compare-and-delete operation.
725 Args:
726 name: The refname to delete.
727 old_ref: The old sha the refname must refer to, or None to
728 delete unconditionally.
729 committer: Optional committer name for reflog
730 timestamp: Optional timestamp for reflog
731 timezone: Optional timezone for reflog
732 message: Optional message for reflog
734 Returns:
735 True if the delete was successful, False otherwise.
736 """
737 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
738 return False
739 try:
740 old = self._refs.pop(name)
741 except KeyError:
742 pass
743 else:
744 self._notify(name, None)
745 self._log(
746 name,
747 old,
748 None,
749 committer=committer,
750 timestamp=timestamp,
751 timezone=timezone,
752 message=message,
753 )
754 return True
756 def get_peeled(self, name: bytes) -> Optional[bytes]:
757 """Get peeled version of a reference."""
758 return self._peeled.get(name)
760 def _update(self, refs: Mapping[bytes, bytes]) -> None:
761 """Update multiple refs; intended only for testing."""
762 # TODO(dborowitz): replace this with a public function that uses
763 # set_if_equal.
764 for ref, sha in refs.items():
765 self.set_if_equals(ref, None, sha)
767 def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None:
768 """Update cached peeled refs; intended only for testing."""
769 self._peeled.update(peeled)
772class InfoRefsContainer(RefsContainer):
773 """Refs container that reads refs from a info/refs file."""
775 def __init__(self, f: BinaryIO) -> None:
776 """Initialize InfoRefsContainer from info/refs file."""
777 self._refs: dict[bytes, bytes] = {}
778 self._peeled: dict[bytes, bytes] = {}
779 refs = read_info_refs(f)
780 (self._refs, self._peeled) = split_peeled_refs(refs)
782 def allkeys(self) -> set[bytes]:
783 """Return all reference keys."""
784 return set(self._refs.keys())
786 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
787 """Read a loose reference."""
788 return self._refs.get(name, None)
790 def get_packed_refs(self) -> dict[bytes, bytes]:
791 """Get packed references."""
792 return {}
794 def get_peeled(self, name: bytes) -> Optional[bytes]:
795 """Get peeled version of a reference."""
796 try:
797 return self._peeled[name]
798 except KeyError:
799 return self._refs[name]
802class DiskRefsContainer(RefsContainer):
803 """Refs container that reads refs from disk."""
805 def __init__(
806 self,
807 path: Union[str, bytes, os.PathLike[str]],
808 worktree_path: Optional[Union[str, bytes, os.PathLike[str]]] = None,
809 logger: Optional[
810 Callable[
811 [
812 bytes,
813 bytes,
814 bytes,
815 Optional[bytes],
816 Optional[int],
817 Optional[int],
818 bytes,
819 ],
820 None,
821 ]
822 ] = None,
823 ) -> None:
824 """Initialize DiskRefsContainer."""
825 super().__init__(logger=logger)
826 # Convert path-like objects to strings, then to bytes for Git compatibility
827 self.path = os.fsencode(os.fspath(path))
828 if worktree_path is None:
829 self.worktree_path = self.path
830 else:
831 self.worktree_path = os.fsencode(os.fspath(worktree_path))
832 self._packed_refs: Optional[dict[bytes, bytes]] = None
833 self._peeled_refs: Optional[dict[bytes, bytes]] = None
835 def __repr__(self) -> str:
836 """Return string representation of DiskRefsContainer."""
837 return f"{self.__class__.__name__}({self.path!r})"
839 def _iter_dir(
840 self,
841 path: bytes,
842 base: bytes,
843 dir_filter: Optional[Callable[[bytes], bool]] = None,
844 ) -> Iterator[bytes]:
845 refspath = os.path.join(path, base.rstrip(b"/"))
846 prefix_len = len(os.path.join(path, b""))
848 for root, dirs, files in os.walk(refspath):
849 directory = root[prefix_len:]
850 if os.path.sep != "/":
851 directory = directory.replace(os.fsencode(os.path.sep), b"/")
852 if dir_filter is not None:
853 dirs[:] = [
854 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
855 ]
857 for filename in files:
858 refname = b"/".join([directory, filename])
859 if check_ref_format(refname):
860 yield refname
862 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
863 base = base.rstrip(b"/") + b"/"
864 search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = []
865 if base != b"refs/":
866 path = self.worktree_path if is_per_worktree_ref(base) else self.path
867 search_paths.append((path, None))
868 elif self.worktree_path == self.path:
869 # Iterate through all the refs from the main worktree
870 search_paths.append((self.path, None))
871 else:
872 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
873 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
874 # Iterate through all the per-worktree refs from the worktree's gitdir
875 search_paths.append((self.worktree_path, is_per_worktree_ref))
877 for path, dir_filter in search_paths:
878 yield from self._iter_dir(path, base, dir_filter=dir_filter)
880 def subkeys(self, base: bytes) -> set[bytes]:
881 """Return subkeys under a given base reference path."""
882 subkeys = set()
884 for key in self._iter_loose_refs(base):
885 if key.startswith(base):
886 subkeys.add(key[len(base) :].strip(b"/"))
888 for key in self.get_packed_refs():
889 if key.startswith(base):
890 subkeys.add(key[len(base) :].strip(b"/"))
891 return subkeys
893 def allkeys(self) -> set[bytes]:
894 """Return all reference keys."""
895 allkeys = set()
896 if os.path.exists(self.refpath(HEADREF)):
897 allkeys.add(HEADREF)
899 allkeys.update(self._iter_loose_refs())
900 allkeys.update(self.get_packed_refs())
901 return allkeys
903 def refpath(self, name: bytes) -> bytes:
904 """Return the disk path of a ref."""
905 path = name
906 if os.path.sep != "/":
907 path = path.replace(b"/", os.fsencode(os.path.sep))
909 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
910 return os.path.join(root_dir, path)
912 def get_packed_refs(self) -> dict[bytes, bytes]:
913 """Get contents of the packed-refs file.
915 Returns: Dictionary mapping ref names to SHA1s
917 Note: Will return an empty dictionary when no packed-refs file is
918 present.
919 """
920 # TODO: invalidate the cache on repacking
921 if self._packed_refs is None:
922 # set both to empty because we want _peeled_refs to be
923 # None if and only if _packed_refs is also None.
924 self._packed_refs = {}
925 self._peeled_refs = {}
926 path = os.path.join(self.path, b"packed-refs")
927 try:
928 f = GitFile(path, "rb")
929 except FileNotFoundError:
930 return {}
931 with f:
932 first_line = next(iter(f)).rstrip()
933 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
934 for sha, name, peeled in read_packed_refs_with_peeled(f):
935 self._packed_refs[name] = sha
936 if peeled:
937 self._peeled_refs[name] = peeled
938 else:
939 f.seek(0)
940 for sha, name in read_packed_refs(f):
941 self._packed_refs[name] = sha
942 return self._packed_refs
944 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
945 """Add the given refs as packed refs.
947 Args:
948 new_refs: A mapping of ref names to targets; if a target is None that
949 means remove the ref
950 """
951 if not new_refs:
952 return
954 path = os.path.join(self.path, b"packed-refs")
956 with GitFile(path, "wb") as f:
957 # reread cached refs from disk, while holding the lock
958 packed_refs = self.get_packed_refs().copy()
960 for ref, target in new_refs.items():
961 # sanity check
962 if ref == HEADREF:
963 raise ValueError("cannot pack HEAD")
965 # remove any loose refs pointing to this one -- please
966 # note that this bypasses remove_if_equals as we don't
967 # want to affect packed refs in here
968 with suppress(OSError):
969 os.remove(self.refpath(ref))
971 if target is not None:
972 packed_refs[ref] = target
973 else:
974 packed_refs.pop(ref, None)
976 write_packed_refs(f, packed_refs, self._peeled_refs)
978 self._packed_refs = packed_refs
980 def get_peeled(self, name: bytes) -> Optional[bytes]:
981 """Return the cached peeled value of a ref, if available.
983 Args:
984 name: Name of the ref to peel
985 Returns: The peeled value of the ref. If the ref is known not point to
986 a tag, this will be the SHA the ref refers to. If the ref may point
987 to a tag, but no cached information is available, None is returned.
988 """
989 self.get_packed_refs()
990 if (
991 self._peeled_refs is None
992 or self._packed_refs is None
993 or name not in self._packed_refs
994 ):
995 # No cache: no peeled refs were read, or this ref is loose
996 return None
997 if name in self._peeled_refs:
998 return self._peeled_refs[name]
999 else:
1000 # Known not peelable
1001 return self[name]
1003 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
1004 """Read a reference file and return its contents.
1006 If the reference file a symbolic reference, only read the first line of
1007 the file. Otherwise, only read the first 40 bytes.
1009 Args:
1010 name: the refname to read, relative to refpath
1011 Returns: The contents of the ref file, or None if the file does not
1012 exist.
1014 Raises:
1015 IOError: if any other error occurs
1016 """
1017 filename = self.refpath(name)
1018 try:
1019 with GitFile(filename, "rb") as f:
1020 header = f.read(len(SYMREF))
1021 if header == SYMREF:
1022 # Read only the first line
1023 return header + next(iter(f)).rstrip(b"\r\n")
1024 else:
1025 # Read only the first 40 bytes
1026 return header + f.read(40 - len(SYMREF))
1027 except (OSError, UnicodeError):
1028 # don't assume anything specific about the error; in
1029 # particular, invalid or forbidden paths can raise weird
1030 # errors depending on the specific operating system
1031 return None
1033 def _remove_packed_ref(self, name: bytes) -> None:
1034 if self._packed_refs is None:
1035 return
1036 filename = os.path.join(self.path, b"packed-refs")
1037 # reread cached refs from disk, while holding the lock
1038 f = GitFile(filename, "wb")
1039 try:
1040 self._packed_refs = None
1041 self.get_packed_refs()
1043 if self._packed_refs is None or name not in self._packed_refs:
1044 f.abort()
1045 return
1047 del self._packed_refs[name]
1048 if self._peeled_refs is not None:
1049 with suppress(KeyError):
1050 del self._peeled_refs[name]
1051 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1052 f.close()
1053 except BaseException:
1054 f.abort()
1055 raise
1057 def set_symbolic_ref(
1058 self,
1059 name: bytes,
1060 other: bytes,
1061 committer: Optional[bytes] = None,
1062 timestamp: Optional[int] = None,
1063 timezone: Optional[int] = None,
1064 message: Optional[bytes] = None,
1065 ) -> None:
1066 """Make a ref point at another ref.
1068 Args:
1069 name: Name of the ref to set
1070 other: Name of the ref to point at
1071 committer: Optional committer name
1072 timestamp: Optional timestamp
1073 timezone: Optional timezone
1074 message: Optional message to describe the change
1075 """
1076 self._check_refname(name)
1077 self._check_refname(other)
1078 filename = self.refpath(name)
1079 f = GitFile(filename, "wb")
1080 try:
1081 f.write(SYMREF + other + b"\n")
1082 sha = self.follow(name)[-1]
1083 self._log(
1084 name,
1085 sha,
1086 sha,
1087 committer=committer,
1088 timestamp=timestamp,
1089 timezone=timezone,
1090 message=message,
1091 )
1092 except BaseException:
1093 f.abort()
1094 raise
1095 else:
1096 f.close()
1098 def set_if_equals(
1099 self,
1100 name: bytes,
1101 old_ref: Optional[bytes],
1102 new_ref: bytes,
1103 committer: Optional[bytes] = None,
1104 timestamp: Optional[int] = None,
1105 timezone: Optional[int] = None,
1106 message: Optional[bytes] = None,
1107 ) -> bool:
1108 """Set a refname to new_ref only if it currently equals old_ref.
1110 This method follows all symbolic references, and can be used to perform
1111 an atomic compare-and-swap operation.
1113 Args:
1114 name: The refname to set.
1115 old_ref: The old sha the refname must refer to, or None to set
1116 unconditionally.
1117 new_ref: The new sha the refname will refer to.
1118 committer: Optional committer name
1119 timestamp: Optional timestamp
1120 timezone: Optional timezone
1121 message: Set message for reflog
1122 Returns: True if the set was successful, False otherwise.
1123 """
1124 self._check_refname(name)
1125 try:
1126 realnames, _ = self.follow(name)
1127 realname = realnames[-1]
1128 except (KeyError, IndexError, SymrefLoop):
1129 realname = name
1130 filename = self.refpath(realname)
1132 # make sure none of the ancestor folders is in packed refs
1133 probe_ref = os.path.dirname(realname)
1134 packed_refs = self.get_packed_refs()
1135 while probe_ref:
1136 if packed_refs.get(probe_ref, None) is not None:
1137 raise NotADirectoryError(filename)
1138 probe_ref = os.path.dirname(probe_ref)
1140 ensure_dir_exists(os.path.dirname(filename))
1141 with GitFile(filename, "wb") as f:
1142 if old_ref is not None:
1143 try:
1144 # read again while holding the lock to handle race conditions
1145 orig_ref = self.read_loose_ref(realname)
1146 if orig_ref is None:
1147 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1148 if orig_ref != old_ref:
1149 f.abort()
1150 return False
1151 except OSError:
1152 f.abort()
1153 raise
1155 # Check if ref already has the desired value while holding the lock
1156 # This avoids fsync when ref is unchanged but still detects lock conflicts
1157 current_ref = self.read_loose_ref(realname)
1158 if current_ref is None:
1159 current_ref = packed_refs.get(realname, None)
1161 if current_ref is not None and current_ref == new_ref:
1162 # Ref already has desired value, abort write to avoid fsync
1163 f.abort()
1164 return True
1166 try:
1167 f.write(new_ref + b"\n")
1168 except OSError:
1169 f.abort()
1170 raise
1171 self._log(
1172 realname,
1173 old_ref,
1174 new_ref,
1175 committer=committer,
1176 timestamp=timestamp,
1177 timezone=timezone,
1178 message=message,
1179 )
1180 return True
1182 def add_if_new(
1183 self,
1184 name: bytes,
1185 ref: bytes,
1186 committer: Optional[bytes] = None,
1187 timestamp: Optional[int] = None,
1188 timezone: Optional[int] = None,
1189 message: Optional[bytes] = None,
1190 ) -> bool:
1191 """Add a new reference only if it does not already exist.
1193 This method follows symrefs, and only ensures that the last ref in the
1194 chain does not exist.
1196 Args:
1197 name: The refname to set.
1198 ref: The new sha the refname will refer to.
1199 committer: Optional committer name
1200 timestamp: Optional timestamp
1201 timezone: Optional timezone
1202 message: Optional message for reflog
1203 Returns: True if the add was successful, False otherwise.
1204 """
1205 try:
1206 realnames, contents = self.follow(name)
1207 if contents is not None:
1208 return False
1209 realname = realnames[-1]
1210 except (KeyError, IndexError):
1211 realname = name
1212 self._check_refname(realname)
1213 filename = self.refpath(realname)
1214 ensure_dir_exists(os.path.dirname(filename))
1215 with GitFile(filename, "wb") as f:
1216 if os.path.exists(filename) or name in self.get_packed_refs():
1217 f.abort()
1218 return False
1219 try:
1220 f.write(ref + b"\n")
1221 except OSError:
1222 f.abort()
1223 raise
1224 else:
1225 self._log(
1226 name,
1227 None,
1228 ref,
1229 committer=committer,
1230 timestamp=timestamp,
1231 timezone=timezone,
1232 message=message,
1233 )
1234 return True
1236 def remove_if_equals(
1237 self,
1238 name: bytes,
1239 old_ref: Optional[bytes],
1240 committer: Optional[bytes] = None,
1241 timestamp: Optional[int] = None,
1242 timezone: Optional[int] = None,
1243 message: Optional[bytes] = None,
1244 ) -> bool:
1245 """Remove a refname only if it currently equals old_ref.
1247 This method does not follow symbolic references. It can be used to
1248 perform an atomic compare-and-delete operation.
1250 Args:
1251 name: The refname to delete.
1252 old_ref: The old sha the refname must refer to, or None to
1253 delete unconditionally.
1254 committer: Optional committer name
1255 timestamp: Optional timestamp
1256 timezone: Optional timezone
1257 message: Optional message
1258 Returns: True if the delete was successful, False otherwise.
1259 """
1260 self._check_refname(name)
1261 filename = self.refpath(name)
1262 ensure_dir_exists(os.path.dirname(filename))
1263 f = GitFile(filename, "wb")
1264 try:
1265 if old_ref is not None:
1266 orig_ref = self.read_loose_ref(name)
1267 if orig_ref is None:
1268 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1269 if orig_ref != old_ref:
1270 return False
1272 # remove the reference file itself
1273 try:
1274 found = os.path.lexists(filename)
1275 except OSError:
1276 # may only be packed, or otherwise unstorable
1277 found = False
1279 if found:
1280 os.remove(filename)
1282 self._remove_packed_ref(name)
1283 self._log(
1284 name,
1285 old_ref,
1286 None,
1287 committer=committer,
1288 timestamp=timestamp,
1289 timezone=timezone,
1290 message=message,
1291 )
1292 finally:
1293 # never write, we just wanted the lock
1294 f.abort()
1296 # outside of the lock, clean-up any parent directory that might now
1297 # be empty. this ensures that re-creating a reference of the same
1298 # name of what was previously a directory works as expected
1299 parent = name
1300 while True:
1301 try:
1302 parent, _ = parent.rsplit(b"/", 1)
1303 except ValueError:
1304 break
1306 if parent == b"refs":
1307 break
1308 parent_filename = self.refpath(parent)
1309 try:
1310 os.rmdir(parent_filename)
1311 except OSError:
1312 # this can be caused by the parent directory being
1313 # removed by another process, being not empty, etc.
1314 # in any case, this is non fatal because we already
1315 # removed the reference, just ignore it
1316 break
1318 return True
1320 def pack_refs(self, all: bool = False) -> None:
1321 """Pack loose refs into packed-refs file.
1323 Args:
1324 all: If True, pack all refs. If False, only pack tags.
1325 """
1326 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1327 for ref in self.allkeys():
1328 if ref == HEADREF:
1329 # Never pack HEAD
1330 continue
1331 if all or ref.startswith(LOCAL_TAG_PREFIX):
1332 try:
1333 sha = self[ref]
1334 if sha:
1335 refs_to_pack[ref] = sha
1336 except KeyError:
1337 # Broken ref, skip it
1338 pass
1340 if refs_to_pack:
1341 self.add_packed_refs(refs_to_pack)
1344def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
1345 """Split a single ref line into a tuple of SHA1 and name."""
1346 fields = line.rstrip(b"\n\r").split(b" ")
1347 if len(fields) != 2:
1348 raise PackedRefsException(f"invalid ref line {line!r}")
1349 sha, name = fields
1350 if not valid_hexsha(sha):
1351 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1352 if not check_ref_format(name):
1353 raise PackedRefsException(f"invalid ref name {name!r}")
1354 return (sha, name)
1357def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
1358 """Read a packed refs file.
1360 Args:
1361 f: file-like object to read from
1362 Returns: Iterator over tuples with SHA1s and ref names.
1363 """
1364 for line in f:
1365 if line.startswith(b"#"):
1366 # Comment
1367 continue
1368 if line.startswith(b"^"):
1369 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1370 yield _split_ref_line(line)
1373def read_packed_refs_with_peeled(
1374 f: IO[bytes],
1375) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
1376 """Read a packed refs file including peeled refs.
1378 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1379 with ref names, SHA1s, and peeled SHA1s (or None).
1381 Args:
1382 f: file-like object to read from, seek'ed to the second line
1383 """
1384 last = None
1385 for line in f:
1386 if line.startswith(b"#"):
1387 continue
1388 line = line.rstrip(b"\r\n")
1389 if line.startswith(b"^"):
1390 if not last:
1391 raise PackedRefsException("unexpected peeled ref line")
1392 if not valid_hexsha(line[1:]):
1393 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1394 sha, name = _split_ref_line(last)
1395 last = None
1396 yield (sha, name, line[1:])
1397 else:
1398 if last:
1399 sha, name = _split_ref_line(last)
1400 yield (sha, name, None)
1401 last = line
1402 if last:
1403 sha, name = _split_ref_line(last)
1404 yield (sha, name, None)
1407def write_packed_refs(
1408 f: IO[bytes],
1409 packed_refs: Mapping[bytes, bytes],
1410 peeled_refs: Optional[Mapping[bytes, bytes]] = None,
1411) -> None:
1412 """Write a packed refs file.
1414 Args:
1415 f: empty file-like object to write to
1416 packed_refs: dict of refname to sha of packed refs to write
1417 peeled_refs: dict of refname to peeled value of sha
1418 """
1419 if peeled_refs is None:
1420 peeled_refs = {}
1421 else:
1422 f.write(b"# pack-refs with: peeled\n")
1423 for refname in sorted(packed_refs.keys()):
1424 f.write(git_line(packed_refs[refname], refname))
1425 if refname in peeled_refs:
1426 f.write(b"^" + peeled_refs[refname] + b"\n")
1429def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
1430 """Read info/refs file.
1432 Args:
1433 f: File-like object to read from
1435 Returns:
1436 Dictionary mapping ref names to SHA1s
1437 """
1438 ret = {}
1439 for line in f.readlines():
1440 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1441 ret[name] = sha
1442 return ret
1445def write_info_refs(
1446 refs: Mapping[bytes, bytes], store: ObjectContainer
1447) -> Iterator[bytes]:
1448 """Generate info refs."""
1449 # TODO: Avoid recursive import :(
1450 from .object_store import peel_sha
1452 for name, sha in sorted(refs.items()):
1453 # get_refs() includes HEAD as a special case, but we don't want to
1454 # advertise it
1455 if name == HEADREF:
1456 continue
1457 try:
1458 o = store[sha]
1459 except KeyError:
1460 continue
1461 _unpeeled, peeled = peel_sha(store, sha)
1462 yield o.id + b"\t" + name + b"\n"
1463 if o.id != peeled.id:
1464 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1467def is_local_branch(x: bytes) -> bool:
1468 """Check if a ref name is a local branch."""
1469 return x.startswith(LOCAL_BRANCH_PREFIX)
1472def local_branch_name(name: bytes) -> bytes:
1473 """Build a full branch ref from a short name.
1475 Args:
1476 name: Short branch name (e.g., b"master") or full ref
1478 Returns:
1479 Full branch ref name (e.g., b"refs/heads/master")
1481 Examples:
1482 >>> local_branch_name(b"master")
1483 b'refs/heads/master'
1484 >>> local_branch_name(b"refs/heads/master")
1485 b'refs/heads/master'
1486 """
1487 if name.startswith(LOCAL_BRANCH_PREFIX):
1488 return name
1489 return LOCAL_BRANCH_PREFIX + name
1492def local_tag_name(name: bytes) -> bytes:
1493 """Build a full tag ref from a short name.
1495 Args:
1496 name: Short tag name (e.g., b"v1.0") or full ref
1498 Returns:
1499 Full tag ref name (e.g., b"refs/tags/v1.0")
1501 Examples:
1502 >>> local_tag_name(b"v1.0")
1503 b'refs/tags/v1.0'
1504 >>> local_tag_name(b"refs/tags/v1.0")
1505 b'refs/tags/v1.0'
1506 """
1507 if name.startswith(LOCAL_TAG_PREFIX):
1508 return name
1509 return LOCAL_TAG_PREFIX + name
1512def local_replace_name(name: bytes) -> bytes:
1513 """Build a full replace ref from a short name.
1515 Args:
1516 name: Short replace name (object SHA) or full ref
1518 Returns:
1519 Full replace ref name (e.g., b"refs/replace/<sha>")
1521 Examples:
1522 >>> local_replace_name(b"abc123")
1523 b'refs/replace/abc123'
1524 >>> local_replace_name(b"refs/replace/abc123")
1525 b'refs/replace/abc123'
1526 """
1527 if name.startswith(LOCAL_REPLACE_PREFIX):
1528 return name
1529 return LOCAL_REPLACE_PREFIX + name
1532def extract_branch_name(ref: bytes) -> bytes:
1533 """Extract branch name from a full branch ref.
1535 Args:
1536 ref: Full branch ref (e.g., b"refs/heads/master")
1538 Returns:
1539 Short branch name (e.g., b"master")
1541 Raises:
1542 ValueError: If ref is not a local branch
1544 Examples:
1545 >>> extract_branch_name(b"refs/heads/master")
1546 b'master'
1547 >>> extract_branch_name(b"refs/heads/feature/foo")
1548 b'feature/foo'
1549 """
1550 if not ref.startswith(LOCAL_BRANCH_PREFIX):
1551 raise ValueError(f"Not a local branch ref: {ref!r}")
1552 return ref[len(LOCAL_BRANCH_PREFIX) :]
1555def extract_tag_name(ref: bytes) -> bytes:
1556 """Extract tag name from a full tag ref.
1558 Args:
1559 ref: Full tag ref (e.g., b"refs/tags/v1.0")
1561 Returns:
1562 Short tag name (e.g., b"v1.0")
1564 Raises:
1565 ValueError: If ref is not a local tag
1567 Examples:
1568 >>> extract_tag_name(b"refs/tags/v1.0")
1569 b'v1.0'
1570 """
1571 if not ref.startswith(LOCAL_TAG_PREFIX):
1572 raise ValueError(f"Not a local tag ref: {ref!r}")
1573 return ref[len(LOCAL_TAG_PREFIX) :]
1576def shorten_ref_name(ref: bytes) -> bytes:
1577 """Convert a full ref name to its short form.
1579 Args:
1580 ref: Full ref name (e.g., b"refs/heads/master")
1582 Returns:
1583 Short ref name (e.g., b"master")
1585 Examples:
1586 >>> shorten_ref_name(b"refs/heads/master")
1587 b'master'
1588 >>> shorten_ref_name(b"refs/remotes/origin/main")
1589 b'origin/main'
1590 >>> shorten_ref_name(b"refs/tags/v1.0")
1591 b'v1.0'
1592 >>> shorten_ref_name(b"HEAD")
1593 b'HEAD'
1594 """
1595 if ref.startswith(LOCAL_BRANCH_PREFIX):
1596 return ref[len(LOCAL_BRANCH_PREFIX) :]
1597 elif ref.startswith(LOCAL_REMOTE_PREFIX):
1598 return ref[len(LOCAL_REMOTE_PREFIX) :]
1599 elif ref.startswith(LOCAL_TAG_PREFIX):
1600 return ref[len(LOCAL_TAG_PREFIX) :]
1601 return ref
1604T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
1607def strip_peeled_refs(refs: T) -> T:
1608 """Remove all peeled refs."""
1609 return {
1610 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1611 }
1614def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
1615 """Split peeled refs from regular refs."""
1616 peeled: dict[bytes, bytes] = {}
1617 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
1619 for ref, sha in refs.items():
1620 if ref.endswith(PEELED_TAG_SUFFIX):
1621 # Only add to peeled dict if sha is not None
1622 if sha is not None:
1623 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1625 return regular, peeled
1628def _set_origin_head(
1629 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
1630) -> None:
1631 # set refs/remotes/origin/HEAD
1632 origin_base = b"refs/remotes/" + origin + b"/"
1633 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1634 origin_ref = origin_base + HEADREF
1635 target_ref = origin_base + extract_branch_name(origin_head)
1636 if target_ref in refs:
1637 refs.set_symbolic_ref(origin_ref, target_ref)
1640def _set_default_branch(
1641 refs: RefsContainer,
1642 origin: bytes,
1643 origin_head: Optional[bytes],
1644 branch: Optional[bytes],
1645 ref_message: Optional[bytes],
1646) -> bytes:
1647 """Set the default branch."""
1648 origin_base = b"refs/remotes/" + origin + b"/"
1649 if branch:
1650 origin_ref = origin_base + branch
1651 if origin_ref in refs:
1652 local_ref = local_branch_name(branch)
1653 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1654 head_ref = local_ref
1655 elif local_tag_name(branch) in refs:
1656 head_ref = local_tag_name(branch)
1657 else:
1658 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1659 elif origin_head:
1660 head_ref = origin_head
1661 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1662 origin_ref = origin_base + extract_branch_name(origin_head)
1663 else:
1664 origin_ref = origin_head
1665 try:
1666 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1667 except KeyError:
1668 pass
1669 else:
1670 raise ValueError("neither origin_head nor branch are provided")
1671 return head_ref
1674def _set_head(
1675 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
1676) -> Optional[bytes]:
1677 if head_ref.startswith(LOCAL_TAG_PREFIX):
1678 # detach HEAD at specified tag
1679 head = refs[head_ref]
1680 if isinstance(head, Tag):
1681 _cls, obj = head.object
1682 head = obj.get_object(obj).id
1683 del refs[HEADREF]
1684 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1685 else:
1686 # set HEAD to specific branch
1687 try:
1688 head = refs[head_ref]
1689 refs.set_symbolic_ref(HEADREF, head_ref)
1690 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1691 except KeyError:
1692 head = None
1693 return head
1696def _import_remote_refs(
1697 refs_container: RefsContainer,
1698 remote_name: str,
1699 refs: dict[bytes, Optional[bytes]],
1700 message: Optional[bytes] = None,
1701 prune: bool = False,
1702 prune_tags: bool = False,
1703) -> None:
1704 stripped_refs = strip_peeled_refs(refs)
1705 branches = {
1706 extract_branch_name(n): v
1707 for (n, v) in stripped_refs.items()
1708 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
1709 }
1710 refs_container.import_refs(
1711 b"refs/remotes/" + remote_name.encode(),
1712 branches,
1713 message=message,
1714 prune=prune,
1715 )
1716 tags = {
1717 extract_tag_name(n): v
1718 for (n, v) in stripped_refs.items()
1719 if n.startswith(LOCAL_TAG_PREFIX)
1720 and not n.endswith(PEELED_TAG_SUFFIX)
1721 and v is not None
1722 }
1723 refs_container.import_refs(
1724 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1725 )
1728def serialize_refs(
1729 store: ObjectContainer, refs: Mapping[bytes, bytes]
1730) -> dict[bytes, bytes]:
1731 """Serialize refs with peeled refs.
1733 Args:
1734 store: Object store to peel refs from
1735 refs: Dictionary of ref names to SHAs
1737 Returns:
1738 Dictionary with refs and peeled refs (marked with ^{})
1739 """
1740 # TODO: Avoid recursive import :(
1741 from .object_store import peel_sha
1743 ret = {}
1744 for ref, sha in refs.items():
1745 try:
1746 unpeeled, peeled = peel_sha(store, sha)
1747 except KeyError:
1748 warnings.warn(
1749 "ref {} points at non-present sha {}".format(
1750 ref.decode("utf-8", "replace"), sha.decode("ascii")
1751 ),
1752 UserWarning,
1753 )
1754 continue
1755 else:
1756 if isinstance(unpeeled, Tag):
1757 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1758 ret[ref] = unpeeled.id
1759 return ret
1762class locked_ref:
1763 """Lock a ref while making modifications.
1765 Works as a context manager.
1766 """
1768 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1769 """Initialize a locked ref.
1771 Args:
1772 refs_container: The DiskRefsContainer to lock the ref in
1773 refname: The ref name to lock
1774 """
1775 self._refs_container = refs_container
1776 self._refname = refname
1777 self._file: Optional[_GitFile] = None
1778 self._realname: Optional[Ref] = None
1779 self._deleted = False
1781 def __enter__(self) -> "locked_ref":
1782 """Enter the context manager and acquire the lock.
1784 Returns:
1785 This locked_ref instance
1787 Raises:
1788 OSError: If the lock cannot be acquired
1789 """
1790 self._refs_container._check_refname(self._refname)
1791 try:
1792 realnames, _ = self._refs_container.follow(self._refname)
1793 self._realname = realnames[-1]
1794 except (KeyError, IndexError, SymrefLoop):
1795 self._realname = self._refname
1797 filename = self._refs_container.refpath(self._realname)
1798 ensure_dir_exists(os.path.dirname(filename))
1799 f = GitFile(filename, "wb")
1800 self._file = f
1801 return self
1803 def __exit__(
1804 self,
1805 exc_type: Optional[type],
1806 exc_value: Optional[BaseException],
1807 traceback: Optional[types.TracebackType],
1808 ) -> None:
1809 """Exit the context manager and release the lock.
1811 Args:
1812 exc_type: Type of exception if one occurred
1813 exc_value: Exception instance if one occurred
1814 traceback: Traceback if an exception occurred
1815 """
1816 if self._file:
1817 if exc_type is not None or self._deleted:
1818 self._file.abort()
1819 else:
1820 self._file.close()
1822 def get(self) -> Optional[bytes]:
1823 """Get the current value of the ref."""
1824 if not self._file:
1825 raise RuntimeError("locked_ref not in context")
1827 assert self._realname is not None
1828 current_ref = self._refs_container.read_loose_ref(self._realname)
1829 if current_ref is None:
1830 current_ref = self._refs_container.get_packed_refs().get(
1831 self._realname, None
1832 )
1833 return current_ref
1835 def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
1836 """Ensure the ref currently equals the expected value.
1838 Args:
1839 expected_value: The expected current value of the ref
1840 Returns:
1841 True if the ref equals the expected value, False otherwise
1842 """
1843 current_value = self.get()
1844 return current_value == expected_value
1846 def set(self, new_ref: bytes) -> None:
1847 """Set the ref to a new value.
1849 Args:
1850 new_ref: The new SHA1 or symbolic ref value
1851 """
1852 if not self._file:
1853 raise RuntimeError("locked_ref not in context")
1855 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1856 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1858 self._file.seek(0)
1859 self._file.truncate()
1860 self._file.write(new_ref + b"\n")
1861 self._deleted = False
1863 def set_symbolic_ref(self, target: Ref) -> None:
1864 """Make this ref point at another ref.
1866 Args:
1867 target: Name of the ref to point at
1868 """
1869 if not self._file:
1870 raise RuntimeError("locked_ref not in context")
1872 self._refs_container._check_refname(target)
1873 self._file.seek(0)
1874 self._file.truncate()
1875 self._file.write(SYMREF + target + b"\n")
1876 self._deleted = False
1878 def delete(self) -> None:
1879 """Delete the ref file while holding the lock."""
1880 if not self._file:
1881 raise RuntimeError("locked_ref not in context")
1883 # Delete the actual ref file while holding the lock
1884 if self._realname:
1885 filename = self._refs_container.refpath(self._realname)
1886 try:
1887 if os.path.lexists(filename):
1888 os.remove(filename)
1889 except FileNotFoundError:
1890 pass
1891 self._refs_container._remove_packed_ref(self._realname)
1893 self._deleted = True
1896class NamespacedRefsContainer(RefsContainer):
1897 """Wrapper that adds namespace prefix to all ref operations.
1899 This implements Git's GIT_NAMESPACE feature, which stores refs under
1900 refs/namespaces/<namespace>/ and filters operations to only show refs
1901 within that namespace.
1903 Example:
1904 With namespace "foo", a ref "refs/heads/master" is stored as
1905 "refs/namespaces/foo/refs/heads/master" in the underlying container.
1906 """
1908 def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
1909 """Initialize NamespacedRefsContainer.
1911 Args:
1912 refs: The underlying refs container to wrap
1913 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
1914 """
1915 super().__init__(logger=refs._logger)
1916 self._refs = refs
1917 # Build namespace prefix: refs/namespaces/<namespace>/
1918 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
1919 namespace_parts = namespace.split(b"/")
1920 self._namespace_prefix = b""
1921 for part in namespace_parts:
1922 self._namespace_prefix += b"refs/namespaces/" + part + b"/"
1924 def _apply_namespace(self, name: bytes) -> bytes:
1925 """Apply namespace prefix to a ref name."""
1926 # HEAD and other special refs are not namespaced
1927 if name == HEADREF or not name.startswith(b"refs/"):
1928 return name
1929 return self._namespace_prefix + name
1931 def _strip_namespace(self, name: bytes) -> Optional[bytes]:
1932 """Remove namespace prefix from a ref name.
1934 Returns None if the ref is not in our namespace.
1935 """
1936 # HEAD and other special refs are not namespaced
1937 if name == HEADREF or not name.startswith(b"refs/"):
1938 return name
1939 if name.startswith(self._namespace_prefix):
1940 return name[len(self._namespace_prefix) :]
1941 return None
1943 def allkeys(self) -> set[bytes]:
1944 """Return all reference keys in this namespace."""
1945 keys = set()
1946 for key in self._refs.allkeys():
1947 stripped = self._strip_namespace(key)
1948 if stripped is not None:
1949 keys.add(stripped)
1950 return keys
1952 def read_loose_ref(self, name: bytes) -> Optional[bytes]:
1953 """Read a loose reference."""
1954 return self._refs.read_loose_ref(self._apply_namespace(name))
1956 def get_packed_refs(self) -> dict[Ref, ObjectID]:
1957 """Get packed refs within this namespace."""
1958 packed = {}
1959 for name, value in self._refs.get_packed_refs().items():
1960 stripped = self._strip_namespace(name)
1961 if stripped is not None:
1962 packed[stripped] = value
1963 return packed
1965 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
1966 """Add packed refs with namespace prefix."""
1967 namespaced_refs = {
1968 self._apply_namespace(name): value for name, value in new_refs.items()
1969 }
1970 self._refs.add_packed_refs(namespaced_refs)
1972 def get_peeled(self, name: bytes) -> Optional[ObjectID]:
1973 """Return the cached peeled value of a ref."""
1974 return self._refs.get_peeled(self._apply_namespace(name))
1976 def set_symbolic_ref(
1977 self,
1978 name: bytes,
1979 other: bytes,
1980 committer: Optional[bytes] = None,
1981 timestamp: Optional[int] = None,
1982 timezone: Optional[int] = None,
1983 message: Optional[bytes] = None,
1984 ) -> None:
1985 """Make a ref point at another ref."""
1986 self._refs.set_symbolic_ref(
1987 self._apply_namespace(name),
1988 self._apply_namespace(other),
1989 committer=committer,
1990 timestamp=timestamp,
1991 timezone=timezone,
1992 message=message,
1993 )
1995 def set_if_equals(
1996 self,
1997 name: bytes,
1998 old_ref: Optional[bytes],
1999 new_ref: bytes,
2000 committer: Optional[bytes] = None,
2001 timestamp: Optional[int] = None,
2002 timezone: Optional[int] = None,
2003 message: Optional[bytes] = None,
2004 ) -> bool:
2005 """Set a refname to new_ref only if it currently equals old_ref."""
2006 return self._refs.set_if_equals(
2007 self._apply_namespace(name),
2008 old_ref,
2009 new_ref,
2010 committer=committer,
2011 timestamp=timestamp,
2012 timezone=timezone,
2013 message=message,
2014 )
2016 def add_if_new(
2017 self,
2018 name: bytes,
2019 ref: bytes,
2020 committer: Optional[bytes] = None,
2021 timestamp: Optional[int] = None,
2022 timezone: Optional[int] = None,
2023 message: Optional[bytes] = None,
2024 ) -> bool:
2025 """Add a new reference only if it does not already exist."""
2026 return self._refs.add_if_new(
2027 self._apply_namespace(name),
2028 ref,
2029 committer=committer,
2030 timestamp=timestamp,
2031 timezone=timezone,
2032 message=message,
2033 )
2035 def remove_if_equals(
2036 self,
2037 name: bytes,
2038 old_ref: Optional[bytes],
2039 committer: Optional[bytes] = None,
2040 timestamp: Optional[int] = None,
2041 timezone: Optional[int] = None,
2042 message: Optional[bytes] = None,
2043 ) -> bool:
2044 """Remove a refname only if it currently equals old_ref."""
2045 return self._refs.remove_if_equals(
2046 self._apply_namespace(name),
2047 old_ref,
2048 committer=committer,
2049 timestamp=timestamp,
2050 timezone=timezone,
2051 message=message,
2052 )
2054 def pack_refs(self, all: bool = False) -> None:
2055 """Pack loose refs into packed-refs file.
2057 Note: This packs all refs in the underlying container, not just
2058 those in the namespace.
2059 """
2060 self._refs.pack_refs(all=all)
2063def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
2064 """Filter refs to only include those with a given prefix.
2066 Args:
2067 refs: A dictionary of refs.
2068 prefixes: The prefixes to filter by.
2069 """
2070 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
2071 return filtered
2074def is_per_worktree_ref(ref: bytes) -> bool:
2075 """Returns whether a reference is stored per worktree or not.
2077 Per-worktree references are:
2078 - all pseudorefs, e.g. HEAD
2079 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
2081 All refs starting with "refs/" are shared, except for the ones listed above.
2083 See https://git-scm.com/docs/git-worktree#_refs.
2084 """
2085 return not ref.startswith(b"refs/") or ref.startswith(
2086 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
2087 )