Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25__all__ = [
26 "HEADREF",
27 "LOCAL_BRANCH_PREFIX",
28 "LOCAL_NOTES_PREFIX",
29 "LOCAL_REMOTE_PREFIX",
30 "LOCAL_REPLACE_PREFIX",
31 "LOCAL_TAG_PREFIX",
32 "SYMREF",
33 "DictRefsContainer",
34 "DiskRefsContainer",
35 "NamespacedRefsContainer",
36 "Ref",
37 "RefsContainer",
38 "SymrefLoop",
39 "check_ref_format",
40 "extract_branch_name",
41 "extract_tag_name",
42 "filter_ref_prefix",
43 "is_local_branch",
44 "is_per_worktree_ref",
45 "local_branch_name",
46 "local_replace_name",
47 "local_tag_name",
48 "parse_remote_ref",
49 "parse_symref_value",
50 "read_info_refs",
51 "read_packed_refs",
52 "read_packed_refs_with_peeled",
53 "set_ref_from_raw",
54 "shorten_ref_name",
55 "write_packed_refs",
56]
58import os
59import types
60from collections.abc import Callable, Iterable, Iterator, Mapping
61from contextlib import suppress
62from typing import (
63 IO,
64 TYPE_CHECKING,
65 Any,
66 BinaryIO,
67 NewType,
68 TypeVar,
69)
71if TYPE_CHECKING:
72 from .file import _GitFile
74from .errors import PackedRefsException, RefFormatError
75from .file import GitFile, ensure_dir_exists
76from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
78Ref = NewType("Ref", bytes)
80T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None])
82HEADREF = Ref(b"HEAD")
83SYMREF = b"ref: "
84LOCAL_BRANCH_PREFIX = b"refs/heads/"
85LOCAL_TAG_PREFIX = b"refs/tags/"
86LOCAL_REMOTE_PREFIX = b"refs/remotes/"
87LOCAL_NOTES_PREFIX = b"refs/notes/"
88LOCAL_REPLACE_PREFIX = b"refs/replace/"
89BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[")
92class SymrefLoop(Exception):
93 """There is a loop between one or more symrefs."""
95 def __init__(self, ref: bytes, depth: int) -> None:
96 """Initialize SymrefLoop exception."""
97 self.ref = ref
98 self.depth = depth
101def parse_symref_value(contents: bytes) -> bytes:
102 """Parse a symref value.
104 Args:
105 contents: Contents to parse
106 Returns: Destination
107 """
108 if contents.startswith(SYMREF):
109 return contents[len(SYMREF) :].rstrip(b"\r\n")
110 raise ValueError(contents)
113def check_ref_format(refname: Ref) -> bool:
114 """Check if a refname is correctly formatted.
116 Implements all the same rules as git-check-ref-format[1].
118 [1]
119 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
121 Args:
122 refname: The refname to check
123 Returns: True if refname is valid, False otherwise
124 """
125 # These could be combined into one big expression, but are listed
126 # separately to parallel [1].
127 if b"/." in refname or refname.startswith(b"."): # type: ignore[comparison-overlap]
128 return False
129 if b"/" not in refname: # type: ignore[comparison-overlap]
130 return False
131 if b".." in refname: # type: ignore[comparison-overlap]
132 return False
133 for i, c in enumerate(refname):
134 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
135 return False
136 if refname[-1] in b"/.":
137 return False
138 if refname.endswith(b".lock"):
139 return False
140 if b"@{" in refname: # type: ignore[comparison-overlap]
141 return False
142 if b"\\" in refname: # type: ignore[comparison-overlap]
143 return False
144 return True
147def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
148 """Parse a remote ref into remote name and branch name.
150 Args:
151 ref: Remote ref like b"refs/remotes/origin/main"
153 Returns:
154 Tuple of (remote_name, branch_name)
156 Raises:
157 ValueError: If ref is not a valid remote ref
158 """
159 if not ref.startswith(LOCAL_REMOTE_PREFIX):
160 raise ValueError(f"Not a remote ref: {ref!r}")
162 # Remove the prefix
163 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
165 # Split into remote name and branch name
166 parts = remainder.split(b"/", 1)
167 if len(parts) != 2:
168 raise ValueError(f"Invalid remote ref format: {ref!r}")
170 remote_name, branch_name = parts
171 return (remote_name, branch_name)
174def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None:
175 """Set a reference from a raw ref value.
177 This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs.
179 Args:
180 refs: The RefsContainer to set the ref in
181 name: The ref name to set
182 raw_ref: The raw ref value (either a symbolic ref or an ObjectID)
183 """
184 if raw_ref.startswith(SYMREF):
185 # It's a symbolic ref
186 target = Ref(raw_ref[len(SYMREF) :])
187 refs.set_symbolic_ref(name, target)
188 else:
189 # It's a direct ObjectID
190 refs[name] = ObjectID(raw_ref)
193class RefsContainer:
194 """A container for refs."""
196 def __init__(
197 self,
198 logger: Callable[
199 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
200 ]
201 | None = None,
202 ) -> None:
203 """Initialize RefsContainer with optional logger function."""
204 self._logger = logger
206 def _log(
207 self,
208 ref: bytes,
209 old_sha: bytes | None,
210 new_sha: bytes | None,
211 committer: bytes | None = None,
212 timestamp: int | None = None,
213 timezone: int | None = None,
214 message: bytes | None = None,
215 ) -> None:
216 if self._logger is None:
217 return
218 if message is None:
219 return
220 # Use ZERO_SHA for None values, matching git behavior
221 if old_sha is None:
222 old_sha = ZERO_SHA
223 if new_sha is None:
224 new_sha = ZERO_SHA
225 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
227 def set_symbolic_ref(
228 self,
229 name: Ref,
230 other: Ref,
231 committer: bytes | None = None,
232 timestamp: int | None = None,
233 timezone: int | None = None,
234 message: bytes | None = None,
235 ) -> None:
236 """Make a ref point at another ref.
238 Args:
239 name: Name of the ref to set
240 other: Name of the ref to point at
241 committer: Optional committer name/email
242 timestamp: Optional timestamp
243 timezone: Optional timezone
244 message: Optional message
245 """
246 raise NotImplementedError(self.set_symbolic_ref)
248 def get_packed_refs(self) -> dict[Ref, ObjectID]:
249 """Get contents of the packed-refs file.
251 Returns: Dictionary mapping ref names to SHA1s
253 Note: Will return an empty dictionary when no packed-refs file is
254 present.
255 """
256 raise NotImplementedError(self.get_packed_refs)
258 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
259 """Add the given refs as packed refs.
261 Args:
262 new_refs: A mapping of ref names to targets; if a target is None that
263 means remove the ref
264 """
265 raise NotImplementedError(self.add_packed_refs)
267 def get_peeled(self, name: Ref) -> ObjectID | None:
268 """Return the cached peeled value of a ref, if available.
270 Args:
271 name: Name of the ref to peel
272 Returns: The peeled value of the ref. If the ref is known not point to
273 a tag, this will be the SHA the ref refers to. If the ref may point
274 to a tag, but no cached information is available, None is returned.
275 """
276 return None
278 def import_refs(
279 self,
280 base: Ref,
281 other: Mapping[Ref, ObjectID | None],
282 committer: bytes | None = None,
283 timestamp: bytes | None = None,
284 timezone: bytes | None = None,
285 message: bytes | None = None,
286 prune: bool = False,
287 ) -> None:
288 """Import refs from another repository.
290 Args:
291 base: Base ref to import into (e.g., b'refs/remotes/origin')
292 other: Dictionary of refs to import
293 committer: Optional committer for reflog
294 timestamp: Optional timestamp for reflog
295 timezone: Optional timezone for reflog
296 message: Optional message for reflog
297 prune: If True, remove refs not in other
298 """
299 if prune:
300 to_delete = set(self.subkeys(base))
301 else:
302 to_delete = set()
303 for name, value in other.items():
304 if value is None:
305 to_delete.add(name)
306 else:
307 self.set_if_equals(
308 Ref(b"/".join((base, name))), None, value, message=message
309 )
310 if to_delete:
311 try:
312 to_delete.remove(name)
313 except KeyError:
314 pass
315 for ref in to_delete:
316 self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message)
318 def allkeys(self) -> set[Ref]:
319 """All refs present in this container."""
320 raise NotImplementedError(self.allkeys)
322 def __iter__(self) -> Iterator[Ref]:
323 """Iterate over all reference keys."""
324 return iter(self.allkeys())
326 def keys(self, base: Ref | None = None) -> set[Ref]:
327 """Refs present in this container.
329 Args:
330 base: An optional base to return refs under.
331 Returns: An unsorted set of valid refs in this container, including
332 packed refs.
333 """
334 if base is not None:
335 return self.subkeys(base)
336 else:
337 return self.allkeys()
339 def subkeys(self, base: Ref) -> set[Ref]:
340 """Refs present in this container under a base.
342 Args:
343 base: The base to return refs under.
344 Returns: A set of valid refs in this container under the base; the base
345 prefix is stripped from the ref names returned.
346 """
347 keys: set[Ref] = set()
348 base_len = len(base) + 1
349 for refname in self.allkeys():
350 if refname.startswith(base):
351 keys.add(Ref(refname[base_len:]))
352 return keys
354 def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]:
355 """Return the contents of this container as a dictionary."""
356 ret: dict[Ref, ObjectID] = {}
357 keys = self.keys(base)
358 base_bytes: bytes
359 if base is None:
360 base_bytes = b""
361 else:
362 base_bytes = base.rstrip(b"/")
363 for key in keys:
364 try:
365 ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))]
366 except (SymrefLoop, KeyError):
367 continue # Unable to resolve
369 return ret
371 def _check_refname(self, name: Ref) -> None:
372 """Ensure a refname is valid and lives in refs or is HEAD.
374 HEAD is not a valid refname according to git-check-ref-format, but this
375 class needs to be able to touch HEAD. Also, check_ref_format expects
376 refnames without the leading 'refs/', but this class requires that
377 so it cannot touch anything outside the refs dir (or HEAD).
379 Args:
380 name: The name of the reference.
382 Raises:
383 KeyError: if a refname is not HEAD or is otherwise not valid.
384 """
385 if name in (HEADREF, Ref(b"refs/stash")):
386 return
387 if not name.startswith(b"refs/") or not check_ref_format(Ref(name[5:])):
388 raise RefFormatError(name)
390 def read_ref(self, refname: Ref) -> bytes | None:
391 """Read a reference without following any references.
393 Args:
394 refname: The name of the reference
395 Returns: The contents of the ref file, or None if it does
396 not exist.
397 """
398 contents = self.read_loose_ref(refname)
399 if not contents:
400 contents = self.get_packed_refs().get(refname, None)
401 return contents
403 def read_loose_ref(self, name: Ref) -> bytes | None:
404 """Read a loose reference and return its contents.
406 Args:
407 name: the refname to read
408 Returns: The contents of the ref file, or None if it does
409 not exist.
410 """
411 raise NotImplementedError(self.read_loose_ref)
413 def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]:
414 """Follow a reference name.
416 Returns: a tuple of (refnames, sha), wheres refnames are the names of
417 references in the chain
418 """
419 contents: bytes | None = SYMREF + name
420 depth = 0
421 refnames: list[Ref] = []
422 while contents and contents.startswith(SYMREF):
423 refname = Ref(contents[len(SYMREF) :])
424 refnames.append(refname)
425 contents = self.read_ref(refname)
426 if not contents:
427 break
428 depth += 1
429 if depth > 5:
430 raise SymrefLoop(name, depth)
431 return refnames, ObjectID(contents) if contents else None
433 def __contains__(self, refname: Ref) -> bool:
434 """Check if a reference exists."""
435 if self.read_ref(refname):
436 return True
437 return False
439 def __getitem__(self, name: Ref) -> ObjectID:
440 """Get the SHA1 for a reference name.
442 This method follows all symbolic references.
443 """
444 _, sha = self.follow(name)
445 if sha is None:
446 raise KeyError(name)
447 return sha
449 def set_if_equals(
450 self,
451 name: Ref,
452 old_ref: ObjectID | None,
453 new_ref: ObjectID,
454 committer: bytes | None = None,
455 timestamp: int | None = None,
456 timezone: int | None = None,
457 message: bytes | None = None,
458 ) -> bool:
459 """Set a refname to new_ref only if it currently equals old_ref.
461 This method follows all symbolic references if applicable for the
462 subclass, and can be used to perform an atomic compare-and-swap
463 operation.
465 Args:
466 name: The refname to set.
467 old_ref: The old sha the refname must refer to, or None to set
468 unconditionally.
469 new_ref: The new sha the refname will refer to.
470 committer: Optional committer name/email
471 timestamp: Optional timestamp
472 timezone: Optional timezone
473 message: Message for reflog
474 Returns: True if the set was successful, False otherwise.
475 """
476 raise NotImplementedError(self.set_if_equals)
478 def add_if_new(
479 self,
480 name: Ref,
481 ref: ObjectID,
482 committer: bytes | None = None,
483 timestamp: int | None = None,
484 timezone: int | None = None,
485 message: bytes | None = None,
486 ) -> bool:
487 """Add a new reference only if it does not already exist.
489 Args:
490 name: Ref name
491 ref: Ref value
492 committer: Optional committer name/email
493 timestamp: Optional timestamp
494 timezone: Optional timezone
495 message: Optional message for reflog
496 """
497 raise NotImplementedError(self.add_if_new)
499 def __setitem__(self, name: Ref, ref: ObjectID) -> None:
500 """Set a reference name to point to the given SHA1.
502 This method follows all symbolic references if applicable for the
503 subclass.
505 Note: This method unconditionally overwrites the contents of a
506 reference. To update atomically only if the reference has not
507 changed, use set_if_equals().
509 Args:
510 name: The refname to set.
511 ref: The new sha the refname will refer to.
512 """
513 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
514 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
515 self.set_if_equals(name, None, ref)
517 def remove_if_equals(
518 self,
519 name: Ref,
520 old_ref: ObjectID | None,
521 committer: bytes | None = None,
522 timestamp: int | None = None,
523 timezone: int | None = None,
524 message: bytes | None = None,
525 ) -> bool:
526 """Remove a refname only if it currently equals old_ref.
528 This method does not follow symbolic references, even if applicable for
529 the subclass. It can be used to perform an atomic compare-and-delete
530 operation.
532 Args:
533 name: The refname to delete.
534 old_ref: The old sha the refname must refer to, or None to
535 delete unconditionally.
536 committer: Optional committer name/email
537 timestamp: Optional timestamp
538 timezone: Optional timezone
539 message: Message for reflog
540 Returns: True if the delete was successful, False otherwise.
541 """
542 raise NotImplementedError(self.remove_if_equals)
544 def __delitem__(self, name: Ref) -> None:
545 """Remove a refname.
547 This method does not follow symbolic references, even if applicable for
548 the subclass.
550 Note: This method unconditionally deletes the contents of a reference.
551 To delete atomically only if the reference has not changed, use
552 remove_if_equals().
554 Args:
555 name: The refname to delete.
556 """
557 self.remove_if_equals(name, None)
559 def get_symrefs(self) -> dict[Ref, Ref]:
560 """Get a dict with all symrefs in this container.
562 Returns: Dictionary mapping source ref to target ref
563 """
564 ret: dict[Ref, Ref] = {}
565 for src in self.allkeys():
566 try:
567 ref_value = self.read_ref(src)
568 assert ref_value is not None
569 dst = parse_symref_value(ref_value)
570 except ValueError:
571 pass
572 else:
573 ret[src] = Ref(dst)
574 return ret
576 def pack_refs(self, all: bool = False) -> None:
577 """Pack loose refs into packed-refs file.
579 Args:
580 all: If True, pack all refs. If False, only pack tags.
581 """
582 raise NotImplementedError(self.pack_refs)
585class DictRefsContainer(RefsContainer):
586 """RefsContainer backed by a simple dict.
588 This container does not support symbolic or packed references and is not
589 threadsafe.
590 """
592 def __init__(
593 self,
594 refs: dict[Ref, bytes],
595 logger: Callable[
596 [
597 bytes,
598 bytes | None,
599 bytes | None,
600 bytes | None,
601 int | None,
602 int | None,
603 bytes | None,
604 ],
605 None,
606 ]
607 | None = None,
608 ) -> None:
609 """Initialize DictRefsContainer with refs dictionary and optional logger."""
610 super().__init__(logger=logger)
611 self._refs = refs
612 self._peeled: dict[Ref, ObjectID] = {}
613 self._watchers: set[Any] = set()
615 def allkeys(self) -> set[Ref]:
616 """Return all reference keys."""
617 return set(self._refs.keys())
619 def read_loose_ref(self, name: Ref) -> bytes | None:
620 """Read a loose reference."""
621 return self._refs.get(name, None)
623 def get_packed_refs(self) -> dict[Ref, ObjectID]:
624 """Get packed references."""
625 return {}
627 def _notify(self, ref: bytes, newsha: bytes | None) -> None:
628 for watcher in self._watchers:
629 watcher._notify((ref, newsha))
631 def set_symbolic_ref(
632 self,
633 name: Ref,
634 other: Ref,
635 committer: bytes | None = None,
636 timestamp: int | None = None,
637 timezone: int | None = None,
638 message: bytes | None = None,
639 ) -> None:
640 """Make a ref point at another ref.
642 Args:
643 name: Name of the ref to set
644 other: Name of the ref to point at
645 committer: Optional committer name for reflog
646 timestamp: Optional timestamp for reflog
647 timezone: Optional timezone for reflog
648 message: Optional message for reflog
649 """
650 old = self.follow(name)[-1]
651 new = SYMREF + other
652 self._refs[name] = new
653 self._notify(name, new)
654 self._log(
655 name,
656 old,
657 new,
658 committer=committer,
659 timestamp=timestamp,
660 timezone=timezone,
661 message=message,
662 )
664 def set_if_equals(
665 self,
666 name: Ref,
667 old_ref: ObjectID | None,
668 new_ref: ObjectID,
669 committer: bytes | None = None,
670 timestamp: int | None = None,
671 timezone: int | None = None,
672 message: bytes | None = None,
673 ) -> bool:
674 """Set a refname to new_ref only if it currently equals old_ref.
676 This method follows all symbolic references, and can be used to perform
677 an atomic compare-and-swap operation.
679 Args:
680 name: The refname to set.
681 old_ref: The old sha the refname must refer to, or None to set
682 unconditionally.
683 new_ref: The new sha the refname will refer to.
684 committer: Optional committer name for reflog
685 timestamp: Optional timestamp for reflog
686 timezone: Optional timezone for reflog
687 message: Optional message for reflog
689 Returns:
690 True if the set was successful, False otherwise.
691 """
692 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
693 return False
694 # Only update the specific ref requested, not the whole chain
695 self._check_refname(name)
696 old = self._refs.get(name)
697 self._refs[name] = new_ref
698 self._notify(name, new_ref)
699 self._log(
700 name,
701 old,
702 new_ref,
703 committer=committer,
704 timestamp=timestamp,
705 timezone=timezone,
706 message=message,
707 )
708 return True
710 def add_if_new(
711 self,
712 name: Ref,
713 ref: ObjectID,
714 committer: bytes | None = None,
715 timestamp: int | None = None,
716 timezone: int | None = None,
717 message: bytes | None = None,
718 ) -> bool:
719 """Add a new reference only if it does not already exist.
721 Args:
722 name: Ref name
723 ref: Ref value
724 committer: Optional committer name for reflog
725 timestamp: Optional timestamp for reflog
726 timezone: Optional timezone for reflog
727 message: Optional message for reflog
729 Returns:
730 True if the add was successful, False otherwise.
731 """
732 if name in self._refs:
733 return False
734 self._refs[name] = ref
735 self._notify(name, ref)
736 self._log(
737 name,
738 None,
739 ref,
740 committer=committer,
741 timestamp=timestamp,
742 timezone=timezone,
743 message=message,
744 )
745 return True
747 def remove_if_equals(
748 self,
749 name: Ref,
750 old_ref: ObjectID | None,
751 committer: bytes | None = None,
752 timestamp: int | None = None,
753 timezone: int | None = None,
754 message: bytes | None = None,
755 ) -> bool:
756 """Remove a refname only if it currently equals old_ref.
758 This method does not follow symbolic references. It can be used to
759 perform an atomic compare-and-delete operation.
761 Args:
762 name: The refname to delete.
763 old_ref: The old sha the refname must refer to, or None to
764 delete unconditionally.
765 committer: Optional committer name for reflog
766 timestamp: Optional timestamp for reflog
767 timezone: Optional timezone for reflog
768 message: Optional message for reflog
770 Returns:
771 True if the delete was successful, False otherwise.
772 """
773 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
774 return False
775 try:
776 old = self._refs.pop(name)
777 except KeyError:
778 pass
779 else:
780 self._notify(name, None)
781 self._log(
782 name,
783 old,
784 None,
785 committer=committer,
786 timestamp=timestamp,
787 timezone=timezone,
788 message=message,
789 )
790 return True
792 def get_peeled(self, name: Ref) -> ObjectID | None:
793 """Get peeled version of a reference."""
794 return self._peeled.get(name)
796 def _update(self, refs: Mapping[Ref, ObjectID]) -> None:
797 """Update multiple refs; intended only for testing."""
798 # TODO(dborowitz): replace this with a public function that uses
799 # set_if_equal.
800 for ref, sha in refs.items():
801 self.set_if_equals(ref, None, sha)
803 def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None:
804 """Update cached peeled refs; intended only for testing."""
805 self._peeled.update(peeled)
808class DiskRefsContainer(RefsContainer):
809 """Refs container that reads refs from disk."""
811 def __init__(
812 self,
813 path: str | bytes | os.PathLike[str],
814 worktree_path: str | bytes | os.PathLike[str] | None = None,
815 logger: Callable[
816 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
817 ]
818 | None = None,
819 ) -> None:
820 """Initialize DiskRefsContainer."""
821 super().__init__(logger=logger)
822 # Convert path-like objects to strings, then to bytes for Git compatibility
823 self.path = os.fsencode(os.fspath(path))
824 if worktree_path is None:
825 self.worktree_path = self.path
826 else:
827 self.worktree_path = os.fsencode(os.fspath(worktree_path))
828 self._packed_refs: dict[Ref, ObjectID] | None = None
829 self._peeled_refs: dict[Ref, ObjectID] | None = None
831 def __repr__(self) -> str:
832 """Return string representation of DiskRefsContainer."""
833 return f"{self.__class__.__name__}({self.path!r})"
835 def _iter_dir(
836 self,
837 path: bytes,
838 base: bytes,
839 dir_filter: Callable[[bytes], bool] | None = None,
840 ) -> Iterator[Ref]:
841 refspath = os.path.join(path, base.rstrip(b"/"))
842 prefix_len = len(os.path.join(path, b""))
844 for root, dirs, files in os.walk(refspath):
845 directory = root[prefix_len:]
846 if os.path.sep != "/":
847 directory = directory.replace(os.fsencode(os.path.sep), b"/")
848 if dir_filter is not None:
849 dirs[:] = [
850 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
851 ]
853 for filename in files:
854 refname = b"/".join([directory, filename])
855 if check_ref_format(Ref(refname)):
856 yield Ref(refname)
858 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]:
859 base = base.rstrip(b"/") + b"/"
860 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = []
861 if base != b"refs/":
862 path = self.worktree_path if is_per_worktree_ref(base) else self.path
863 search_paths.append((path, None))
864 elif self.worktree_path == self.path:
865 # Iterate through all the refs from the main worktree
866 search_paths.append((self.path, None))
867 else:
868 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
869 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
870 # Iterate through all the per-worktree refs from the worktree's gitdir
871 search_paths.append((self.worktree_path, is_per_worktree_ref))
873 for path, dir_filter in search_paths:
874 yield from self._iter_dir(path, base, dir_filter=dir_filter)
876 def subkeys(self, base: Ref) -> set[Ref]:
877 """Return subkeys under a given base reference path."""
878 subkeys: set[Ref] = set()
880 for key in self._iter_loose_refs(base):
881 if key.startswith(base):
882 subkeys.add(Ref(key[len(base) :].strip(b"/")))
884 for key in self.get_packed_refs():
885 if key.startswith(base):
886 subkeys.add(Ref(key[len(base) :].strip(b"/")))
887 return subkeys
889 def allkeys(self) -> set[Ref]:
890 """Return all reference keys."""
891 allkeys: set[Ref] = set()
892 if os.path.exists(self.refpath(HEADREF)):
893 allkeys.add(Ref(HEADREF))
895 allkeys.update(self._iter_loose_refs())
896 allkeys.update(self.get_packed_refs())
897 return allkeys
899 def refpath(self, name: bytes) -> bytes:
900 """Return the disk path of a ref."""
901 path = name
902 if os.path.sep != "/":
903 path = path.replace(b"/", os.fsencode(os.path.sep))
905 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
906 return os.path.join(root_dir, path)
908 def get_packed_refs(self) -> dict[Ref, ObjectID]:
909 """Get contents of the packed-refs file.
911 Returns: Dictionary mapping ref names to SHA1s
913 Note: Will return an empty dictionary when no packed-refs file is
914 present.
915 """
916 # TODO: invalidate the cache on repacking
917 if self._packed_refs is None:
918 # set both to empty because we want _peeled_refs to be
919 # None if and only if _packed_refs is also None.
920 self._packed_refs = {}
921 self._peeled_refs = {}
922 path = os.path.join(self.path, b"packed-refs")
923 try:
924 f = GitFile(path, "rb")
925 except FileNotFoundError:
926 return {}
927 with f:
928 first_line = next(iter(f)).rstrip()
929 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
930 for sha, name, peeled in read_packed_refs_with_peeled(f):
931 self._packed_refs[name] = sha
932 if peeled:
933 self._peeled_refs[name] = peeled
934 else:
935 f.seek(0)
936 for sha, name in read_packed_refs(f):
937 self._packed_refs[name] = sha
938 return self._packed_refs
940 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
941 """Add the given refs as packed refs.
943 Args:
944 new_refs: A mapping of ref names to targets; if a target is None that
945 means remove the ref
946 """
947 if not new_refs:
948 return
950 path = os.path.join(self.path, b"packed-refs")
952 with GitFile(path, "wb") as f:
953 # reread cached refs from disk, while holding the lock
954 packed_refs = self.get_packed_refs().copy()
956 for ref, target in new_refs.items():
957 # sanity check
958 if ref == HEADREF:
959 raise ValueError("cannot pack HEAD")
961 # remove any loose refs pointing to this one -- please
962 # note that this bypasses remove_if_equals as we don't
963 # want to affect packed refs in here
964 with suppress(OSError):
965 os.remove(self.refpath(ref))
967 if target is not None:
968 packed_refs[ref] = target
969 else:
970 packed_refs.pop(ref, None)
972 write_packed_refs(f, packed_refs, self._peeled_refs)
974 self._packed_refs = packed_refs
976 def get_peeled(self, name: Ref) -> ObjectID | None:
977 """Return the cached peeled value of a ref, if available.
979 Args:
980 name: Name of the ref to peel
981 Returns: The peeled value of the ref. If the ref is known not point to
982 a tag, this will be the SHA the ref refers to. If the ref may point
983 to a tag, but no cached information is available, None is returned.
984 """
985 self.get_packed_refs()
986 if (
987 self._peeled_refs is None
988 or self._packed_refs is None
989 or name not in self._packed_refs
990 ):
991 # No cache: no peeled refs were read, or this ref is loose
992 return None
993 if name in self._peeled_refs:
994 return self._peeled_refs[name]
995 else:
996 # Known not peelable
997 return self[name]
999 def read_loose_ref(self, name: Ref) -> bytes | None:
1000 """Read a reference file and return its contents.
1002 If the reference file a symbolic reference, only read the first line of
1003 the file. Otherwise, read the hash (40 bytes for SHA1, 64 bytes for SHA256).
1005 Args:
1006 name: the refname to read, relative to refpath
1007 Returns: The contents of the ref file, or None if the file does not
1008 exist.
1010 Raises:
1011 IOError: if any other error occurs
1012 """
1013 filename = self.refpath(name)
1014 try:
1015 with GitFile(filename, "rb") as f:
1016 header = f.read(len(SYMREF))
1017 if header == SYMREF:
1018 # Read only the first line
1019 return header + next(iter(f)).rstrip(b"\r\n")
1020 else:
1021 # Read the entire line to get the full hash (handles both SHA1 and SHA256)
1022 f.seek(0)
1023 line = f.readline().rstrip(b"\r\n")
1024 return line
1025 except (OSError, UnicodeError):
1026 # don't assume anything specific about the error; in
1027 # particular, invalid or forbidden paths can raise weird
1028 # errors depending on the specific operating system
1029 return None
1031 def _remove_packed_ref(self, name: Ref) -> None:
1032 if self._packed_refs is None:
1033 return
1034 filename = os.path.join(self.path, b"packed-refs")
1035 # reread cached refs from disk, while holding the lock
1036 f = GitFile(filename, "wb")
1037 try:
1038 self._packed_refs = None
1039 self.get_packed_refs()
1041 if self._packed_refs is None or name not in self._packed_refs:
1042 f.abort()
1043 return
1045 del self._packed_refs[name]
1046 if self._peeled_refs is not None:
1047 with suppress(KeyError):
1048 del self._peeled_refs[name]
1049 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1050 f.close()
1051 except BaseException:
1052 f.abort()
1053 raise
1055 def set_symbolic_ref(
1056 self,
1057 name: Ref,
1058 other: Ref,
1059 committer: bytes | None = None,
1060 timestamp: int | None = None,
1061 timezone: int | None = None,
1062 message: bytes | None = None,
1063 ) -> None:
1064 """Make a ref point at another ref.
1066 Args:
1067 name: Name of the ref to set
1068 other: Name of the ref to point at
1069 committer: Optional committer name
1070 timestamp: Optional timestamp
1071 timezone: Optional timezone
1072 message: Optional message to describe the change
1073 """
1074 self._check_refname(name)
1075 self._check_refname(other)
1076 filename = self.refpath(name)
1077 f = GitFile(filename, "wb")
1078 try:
1079 f.write(SYMREF + other + b"\n")
1080 sha = self.follow(name)[-1]
1081 self._log(
1082 name,
1083 sha,
1084 sha,
1085 committer=committer,
1086 timestamp=timestamp,
1087 timezone=timezone,
1088 message=message,
1089 )
1090 except BaseException:
1091 f.abort()
1092 raise
1093 else:
1094 f.close()
1096 def set_if_equals(
1097 self,
1098 name: Ref,
1099 old_ref: ObjectID | None,
1100 new_ref: ObjectID,
1101 committer: bytes | None = None,
1102 timestamp: int | None = None,
1103 timezone: int | None = None,
1104 message: bytes | None = None,
1105 ) -> bool:
1106 """Set a refname to new_ref only if it currently equals old_ref.
1108 This method follows all symbolic references, and can be used to perform
1109 an atomic compare-and-swap operation.
1111 Args:
1112 name: The refname to set.
1113 old_ref: The old sha the refname must refer to, or None to set
1114 unconditionally.
1115 new_ref: The new sha the refname will refer to.
1116 committer: Optional committer name
1117 timestamp: Optional timestamp
1118 timezone: Optional timezone
1119 message: Set message for reflog
1120 Returns: True if the set was successful, False otherwise.
1121 """
1122 self._check_refname(name)
1123 try:
1124 realnames, _ = self.follow(name)
1125 realname = realnames[-1]
1126 except (KeyError, IndexError, SymrefLoop):
1127 realname = name
1128 filename = self.refpath(realname)
1130 # make sure none of the ancestor folders is in packed refs
1131 probe_ref = Ref(os.path.dirname(realname))
1132 packed_refs = self.get_packed_refs()
1133 while probe_ref:
1134 if packed_refs.get(probe_ref, None) is not None:
1135 raise NotADirectoryError(filename)
1136 probe_ref = Ref(os.path.dirname(probe_ref))
1138 ensure_dir_exists(os.path.dirname(filename))
1139 with GitFile(filename, "wb") as f:
1140 if old_ref is not None:
1141 try:
1142 # read again while holding the lock to handle race conditions
1143 orig_ref = self.read_loose_ref(realname)
1144 if orig_ref is None:
1145 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1146 if orig_ref != old_ref:
1147 f.abort()
1148 return False
1149 except OSError:
1150 f.abort()
1151 raise
1153 # Check if ref already has the desired value while holding the lock
1154 # This avoids fsync when ref is unchanged but still detects lock conflicts
1155 current_ref = self.read_loose_ref(realname)
1156 if current_ref is None:
1157 current_ref = packed_refs.get(realname, None)
1159 if current_ref is not None and current_ref == new_ref:
1160 # Ref already has desired value, abort write to avoid fsync
1161 f.abort()
1162 return True
1164 try:
1165 f.write(new_ref + b"\n")
1166 except OSError:
1167 f.abort()
1168 raise
1169 self._log(
1170 realname,
1171 old_ref,
1172 new_ref,
1173 committer=committer,
1174 timestamp=timestamp,
1175 timezone=timezone,
1176 message=message,
1177 )
1178 return True
1180 def add_if_new(
1181 self,
1182 name: Ref,
1183 ref: ObjectID,
1184 committer: bytes | None = None,
1185 timestamp: int | None = None,
1186 timezone: int | None = None,
1187 message: bytes | None = None,
1188 ) -> bool:
1189 """Add a new reference only if it does not already exist.
1191 This method follows symrefs, and only ensures that the last ref in the
1192 chain does not exist.
1194 Args:
1195 name: The refname to set.
1196 ref: The new sha the refname will refer to.
1197 committer: Optional committer name
1198 timestamp: Optional timestamp
1199 timezone: Optional timezone
1200 message: Optional message for reflog
1201 Returns: True if the add was successful, False otherwise.
1202 """
1203 try:
1204 realnames, contents = self.follow(name)
1205 if contents is not None:
1206 return False
1207 realname = realnames[-1]
1208 except (KeyError, IndexError):
1209 realname = name
1210 self._check_refname(realname)
1211 filename = self.refpath(realname)
1212 ensure_dir_exists(os.path.dirname(filename))
1213 with GitFile(filename, "wb") as f:
1214 if os.path.exists(filename) or name in self.get_packed_refs():
1215 f.abort()
1216 return False
1217 try:
1218 f.write(ref + b"\n")
1219 except OSError:
1220 f.abort()
1221 raise
1222 else:
1223 self._log(
1224 name,
1225 None,
1226 ref,
1227 committer=committer,
1228 timestamp=timestamp,
1229 timezone=timezone,
1230 message=message,
1231 )
1232 return True
1234 def remove_if_equals(
1235 self,
1236 name: Ref,
1237 old_ref: ObjectID | None,
1238 committer: bytes | None = None,
1239 timestamp: int | None = None,
1240 timezone: int | None = None,
1241 message: bytes | None = None,
1242 ) -> bool:
1243 """Remove a refname only if it currently equals old_ref.
1245 This method does not follow symbolic references. It can be used to
1246 perform an atomic compare-and-delete operation.
1248 Args:
1249 name: The refname to delete.
1250 old_ref: The old sha the refname must refer to, or None to
1251 delete unconditionally.
1252 committer: Optional committer name
1253 timestamp: Optional timestamp
1254 timezone: Optional timezone
1255 message: Optional message
1256 Returns: True if the delete was successful, False otherwise.
1257 """
1258 self._check_refname(name)
1259 filename = self.refpath(name)
1260 ensure_dir_exists(os.path.dirname(filename))
1261 f = GitFile(filename, "wb")
1262 try:
1263 if old_ref is not None:
1264 orig_ref = self.read_loose_ref(name)
1265 if orig_ref is None:
1266 orig_ref = self.get_packed_refs().get(name)
1267 if orig_ref is None:
1268 orig_ref = ZERO_SHA
1269 if orig_ref != old_ref:
1270 return False
1272 # remove the reference file itself
1273 try:
1274 found = os.path.lexists(filename)
1275 except OSError:
1276 # may only be packed, or otherwise unstorable
1277 found = False
1279 if found:
1280 os.remove(filename)
1282 self._remove_packed_ref(name)
1283 self._log(
1284 name,
1285 old_ref,
1286 None,
1287 committer=committer,
1288 timestamp=timestamp,
1289 timezone=timezone,
1290 message=message,
1291 )
1292 finally:
1293 # never write, we just wanted the lock
1294 f.abort()
1296 # outside of the lock, clean-up any parent directory that might now
1297 # be empty. this ensures that re-creating a reference of the same
1298 # name of what was previously a directory works as expected
1299 parent = name
1300 while True:
1301 try:
1302 parent_bytes, _ = parent.rsplit(b"/", 1)
1303 parent = Ref(parent_bytes)
1304 except ValueError:
1305 break
1307 if parent == b"refs":
1308 break
1309 parent_filename = self.refpath(parent)
1310 try:
1311 os.rmdir(parent_filename)
1312 except OSError:
1313 # this can be caused by the parent directory being
1314 # removed by another process, being not empty, etc.
1315 # in any case, this is non fatal because we already
1316 # removed the reference, just ignore it
1317 break
1319 return True
1321 def pack_refs(self, all: bool = False) -> None:
1322 """Pack loose refs into packed-refs file.
1324 Args:
1325 all: If True, pack all refs. If False, only pack tags.
1326 """
1327 refs_to_pack: dict[Ref, ObjectID | None] = {}
1328 for ref in self.allkeys():
1329 if ref == HEADREF:
1330 # Never pack HEAD
1331 continue
1332 if all or ref.startswith(LOCAL_TAG_PREFIX):
1333 try:
1334 sha = self[ref]
1335 if sha:
1336 refs_to_pack[ref] = sha
1337 except KeyError:
1338 # Broken ref, skip it
1339 pass
1341 if refs_to_pack:
1342 self.add_packed_refs(refs_to_pack)
1345def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]:
1346 """Split a single ref line into a tuple of SHA1 and name."""
1347 fields = line.rstrip(b"\n\r").split(b" ")
1348 if len(fields) != 2:
1349 raise PackedRefsException(f"invalid ref line {line!r}")
1350 sha, name = fields
1351 if not valid_hexsha(sha):
1352 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1353 if not check_ref_format(Ref(name)):
1354 raise PackedRefsException(f"invalid ref name {name!r}")
1355 return (ObjectID(sha), Ref(name))
1358def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]:
1359 """Read a packed refs file.
1361 Args:
1362 f: file-like object to read from
1363 Returns: Iterator over tuples with SHA1s and ref names.
1364 """
1365 for line in f:
1366 if line.startswith(b"#"):
1367 # Comment
1368 continue
1369 if line.startswith(b"^"):
1370 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1371 yield _split_ref_line(line)
1374def read_packed_refs_with_peeled(
1375 f: IO[bytes],
1376) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]:
1377 """Read a packed refs file including peeled refs.
1379 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1380 with ref names, SHA1s, and peeled SHA1s (or None).
1382 Args:
1383 f: file-like object to read from, seek'ed to the second line
1384 """
1385 last = None
1386 for line in f:
1387 if line.startswith(b"#"):
1388 continue
1389 line = line.rstrip(b"\r\n")
1390 if line.startswith(b"^"):
1391 if not last:
1392 raise PackedRefsException("unexpected peeled ref line")
1393 if not valid_hexsha(line[1:]):
1394 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1395 sha, name = _split_ref_line(last)
1396 last = None
1397 yield (sha, name, ObjectID(line[1:]))
1398 else:
1399 if last:
1400 sha, name = _split_ref_line(last)
1401 yield (sha, name, None)
1402 last = line
1403 if last:
1404 sha, name = _split_ref_line(last)
1405 yield (sha, name, None)
1408def write_packed_refs(
1409 f: IO[bytes],
1410 packed_refs: Mapping[Ref, ObjectID],
1411 peeled_refs: Mapping[Ref, ObjectID] | None = None,
1412) -> None:
1413 """Write a packed refs file.
1415 Args:
1416 f: empty file-like object to write to
1417 packed_refs: dict of refname to sha of packed refs to write
1418 peeled_refs: dict of refname to peeled value of sha
1419 """
1420 if peeled_refs is None:
1421 peeled_refs = {}
1422 else:
1423 f.write(b"# pack-refs with: peeled\n")
1424 for refname in sorted(packed_refs.keys()):
1425 f.write(git_line(packed_refs[refname], refname))
1426 if refname in peeled_refs:
1427 f.write(b"^" + peeled_refs[refname] + b"\n")
1430def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]:
1431 """Read info/refs file.
1433 Args:
1434 f: File-like object to read from
1436 Returns:
1437 Dictionary mapping ref names to SHA1s
1438 """
1439 ret: dict[Ref, ObjectID] = {}
1440 for line_no, line in enumerate(f.readlines(), 1):
1441 stripped = line.rstrip(b"\r\n")
1442 parts = stripped.split(b"\t", 1)
1443 if len(parts) != 2:
1444 raise ValueError(
1445 f"Invalid info/refs format at line {line_no}: "
1446 f"expected '<sha>\\t<refname>', got {stripped[:100]!r}"
1447 )
1448 (sha, name) = parts
1449 ret[Ref(name)] = ObjectID(sha)
1450 return ret
1453def is_local_branch(x: bytes) -> bool:
1454 """Check if a ref name is a local branch."""
1455 return x.startswith(LOCAL_BRANCH_PREFIX)
1458def local_branch_name(name: bytes) -> Ref:
1459 """Build a full branch ref from a short name.
1461 Args:
1462 name: Short branch name (e.g., b"master") or full ref
1464 Returns:
1465 Full branch ref name (e.g., b"refs/heads/master")
1467 Examples:
1468 >>> local_branch_name(b"master")
1469 b'refs/heads/master'
1470 >>> local_branch_name(b"refs/heads/master")
1471 b'refs/heads/master'
1472 """
1473 if name.startswith(LOCAL_BRANCH_PREFIX):
1474 return Ref(name)
1475 return Ref(LOCAL_BRANCH_PREFIX + name)
1478def local_tag_name(name: bytes) -> Ref:
1479 """Build a full tag ref from a short name.
1481 Args:
1482 name: Short tag name (e.g., b"v1.0") or full ref
1484 Returns:
1485 Full tag ref name (e.g., b"refs/tags/v1.0")
1487 Examples:
1488 >>> local_tag_name(b"v1.0")
1489 b'refs/tags/v1.0'
1490 >>> local_tag_name(b"refs/tags/v1.0")
1491 b'refs/tags/v1.0'
1492 """
1493 if name.startswith(LOCAL_TAG_PREFIX):
1494 return Ref(name)
1495 return Ref(LOCAL_TAG_PREFIX + name)
1498def local_replace_name(name: bytes) -> Ref:
1499 """Build a full replace ref from a short name.
1501 Args:
1502 name: Short replace name (object SHA) or full ref
1504 Returns:
1505 Full replace ref name (e.g., b"refs/replace/<sha>")
1507 Examples:
1508 >>> local_replace_name(b"abc123")
1509 b'refs/replace/abc123'
1510 >>> local_replace_name(b"refs/replace/abc123")
1511 b'refs/replace/abc123'
1512 """
1513 if name.startswith(LOCAL_REPLACE_PREFIX):
1514 return Ref(name)
1515 return Ref(LOCAL_REPLACE_PREFIX + name)
1518def extract_branch_name(ref: bytes) -> bytes:
1519 """Extract branch name from a full branch ref.
1521 Args:
1522 ref: Full branch ref (e.g., b"refs/heads/master")
1524 Returns:
1525 Short branch name (e.g., b"master")
1527 Raises:
1528 ValueError: If ref is not a local branch
1530 Examples:
1531 >>> extract_branch_name(b"refs/heads/master")
1532 b'master'
1533 >>> extract_branch_name(b"refs/heads/feature/foo")
1534 b'feature/foo'
1535 """
1536 if not ref.startswith(LOCAL_BRANCH_PREFIX):
1537 raise ValueError(f"Not a local branch ref: {ref!r}")
1538 return ref[len(LOCAL_BRANCH_PREFIX) :]
1541def extract_tag_name(ref: bytes) -> bytes:
1542 """Extract tag name from a full tag ref.
1544 Args:
1545 ref: Full tag ref (e.g., b"refs/tags/v1.0")
1547 Returns:
1548 Short tag name (e.g., b"v1.0")
1550 Raises:
1551 ValueError: If ref is not a local tag
1553 Examples:
1554 >>> extract_tag_name(b"refs/tags/v1.0")
1555 b'v1.0'
1556 """
1557 if not ref.startswith(LOCAL_TAG_PREFIX):
1558 raise ValueError(f"Not a local tag ref: {ref!r}")
1559 return ref[len(LOCAL_TAG_PREFIX) :]
1562def shorten_ref_name(ref: bytes) -> bytes:
1563 """Convert a full ref name to its short form.
1565 Args:
1566 ref: Full ref name (e.g., b"refs/heads/master")
1568 Returns:
1569 Short ref name (e.g., b"master")
1571 Examples:
1572 >>> shorten_ref_name(b"refs/heads/master")
1573 b'master'
1574 >>> shorten_ref_name(b"refs/remotes/origin/main")
1575 b'origin/main'
1576 >>> shorten_ref_name(b"refs/tags/v1.0")
1577 b'v1.0'
1578 >>> shorten_ref_name(b"HEAD")
1579 b'HEAD'
1580 """
1581 if ref.startswith(LOCAL_BRANCH_PREFIX):
1582 return ref[len(LOCAL_BRANCH_PREFIX) :]
1583 elif ref.startswith(LOCAL_REMOTE_PREFIX):
1584 return ref[len(LOCAL_REMOTE_PREFIX) :]
1585 elif ref.startswith(LOCAL_TAG_PREFIX):
1586 return ref[len(LOCAL_TAG_PREFIX) :]
1587 return ref
1590def _set_origin_head(
1591 refs: RefsContainer, origin: bytes, origin_head: bytes | None
1592) -> None:
1593 # set refs/remotes/origin/HEAD
1594 origin_base = b"refs/remotes/" + origin + b"/"
1595 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1596 origin_ref = Ref(origin_base + HEADREF)
1597 target_ref = Ref(origin_base + extract_branch_name(origin_head))
1598 if target_ref in refs:
1599 refs.set_symbolic_ref(origin_ref, target_ref)
1602def _set_default_branch(
1603 refs: RefsContainer,
1604 origin: bytes,
1605 origin_head: bytes | None,
1606 branch: bytes | None,
1607 ref_message: bytes | None,
1608) -> bytes:
1609 """Set the default branch."""
1610 origin_base = b"refs/remotes/" + origin + b"/"
1611 if branch:
1612 origin_ref = Ref(origin_base + branch)
1613 if origin_ref in refs:
1614 local_ref = Ref(local_branch_name(branch))
1615 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1616 head_ref = local_ref
1617 elif Ref(local_tag_name(branch)) in refs:
1618 head_ref = Ref(local_tag_name(branch))
1619 else:
1620 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1621 elif origin_head:
1622 head_ref = Ref(origin_head)
1623 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1624 origin_ref = Ref(origin_base + extract_branch_name(origin_head))
1625 else:
1626 origin_ref = Ref(origin_head)
1627 try:
1628 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1629 except KeyError:
1630 pass
1631 else:
1632 raise ValueError("neither origin_head nor branch are provided")
1633 return head_ref
1636def _set_head(
1637 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None
1638) -> ObjectID | None:
1639 if head_ref.startswith(LOCAL_TAG_PREFIX):
1640 # detach HEAD at specified tag
1641 head = refs[Ref(head_ref)]
1642 if isinstance(head, Tag):
1643 _cls, obj = head.object
1644 head = obj.get_object(obj).id
1645 del refs[HEADREF]
1646 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1647 else:
1648 # set HEAD to specific branch
1649 try:
1650 head = refs[Ref(head_ref)]
1651 refs.set_symbolic_ref(HEADREF, Ref(head_ref))
1652 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1653 except KeyError:
1654 head = None
1655 return head
1658def _import_remote_refs(
1659 refs_container: RefsContainer,
1660 remote_name: str,
1661 refs: Mapping[Ref, ObjectID | None],
1662 message: bytes | None = None,
1663 prune: bool = False,
1664 prune_tags: bool = False,
1665) -> None:
1666 from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs
1668 stripped_refs = strip_peeled_refs(refs)
1669 branches: dict[Ref, ObjectID | None] = {
1670 Ref(extract_branch_name(n)): v
1671 for (n, v) in stripped_refs.items()
1672 if n.startswith(LOCAL_BRANCH_PREFIX)
1673 }
1674 refs_container.import_refs(
1675 Ref(b"refs/remotes/" + remote_name.encode()),
1676 branches,
1677 message=message,
1678 prune=prune,
1679 )
1680 tags: dict[Ref, ObjectID | None] = {
1681 Ref(extract_tag_name(n)): v
1682 for (n, v) in stripped_refs.items()
1683 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1684 }
1685 refs_container.import_refs(
1686 Ref(LOCAL_TAG_PREFIX), tags, message=message, prune=prune_tags
1687 )
1690class locked_ref:
1691 """Lock a ref while making modifications.
1693 Works as a context manager.
1694 """
1696 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1697 """Initialize a locked ref.
1699 Args:
1700 refs_container: The DiskRefsContainer to lock the ref in
1701 refname: The ref name to lock
1702 """
1703 self._refs_container = refs_container
1704 self._refname = refname
1705 self._file: _GitFile | None = None
1706 self._realname: Ref | None = None
1707 self._deleted = False
1709 def __enter__(self) -> "locked_ref":
1710 """Enter the context manager and acquire the lock.
1712 Returns:
1713 This locked_ref instance
1715 Raises:
1716 OSError: If the lock cannot be acquired
1717 """
1718 self._refs_container._check_refname(self._refname)
1719 try:
1720 realnames, _ = self._refs_container.follow(self._refname)
1721 self._realname = realnames[-1]
1722 except (KeyError, IndexError, SymrefLoop):
1723 self._realname = self._refname
1725 filename = self._refs_container.refpath(self._realname)
1726 ensure_dir_exists(os.path.dirname(filename))
1727 f = GitFile(filename, "wb")
1728 self._file = f
1729 return self
1731 def __exit__(
1732 self,
1733 exc_type: type | None,
1734 exc_value: BaseException | None,
1735 traceback: types.TracebackType | None,
1736 ) -> None:
1737 """Exit the context manager and release the lock.
1739 Args:
1740 exc_type: Type of exception if one occurred
1741 exc_value: Exception instance if one occurred
1742 traceback: Traceback if an exception occurred
1743 """
1744 if self._file:
1745 if exc_type is not None or self._deleted:
1746 self._file.abort()
1747 else:
1748 self._file.close()
1750 def get(self) -> bytes | None:
1751 """Get the current value of the ref."""
1752 if not self._file:
1753 raise RuntimeError("locked_ref not in context")
1755 assert self._realname is not None
1756 current_ref = self._refs_container.read_loose_ref(self._realname)
1757 if current_ref is None:
1758 current_ref = self._refs_container.get_packed_refs().get(
1759 self._realname, None
1760 )
1761 return current_ref
1763 def ensure_equals(self, expected_value: bytes | None) -> bool:
1764 """Ensure the ref currently equals the expected value.
1766 Args:
1767 expected_value: The expected current value of the ref
1768 Returns:
1769 True if the ref equals the expected value, False otherwise
1770 """
1771 current_value = self.get()
1772 return current_value == expected_value
1774 def set(self, new_ref: bytes) -> None:
1775 """Set the ref to a new value.
1777 Args:
1778 new_ref: The new SHA1 or symbolic ref value
1779 """
1780 if not self._file:
1781 raise RuntimeError("locked_ref not in context")
1783 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1784 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1786 self._file.seek(0)
1787 self._file.truncate()
1788 self._file.write(new_ref + b"\n")
1789 self._deleted = False
1791 def set_symbolic_ref(self, target: Ref) -> None:
1792 """Make this ref point at another ref.
1794 Args:
1795 target: Name of the ref to point at
1796 """
1797 if not self._file:
1798 raise RuntimeError("locked_ref not in context")
1800 self._refs_container._check_refname(target)
1801 self._file.seek(0)
1802 self._file.truncate()
1803 self._file.write(SYMREF + target + b"\n")
1804 self._deleted = False
1806 def delete(self) -> None:
1807 """Delete the ref file while holding the lock."""
1808 if not self._file:
1809 raise RuntimeError("locked_ref not in context")
1811 # Delete the actual ref file while holding the lock
1812 if self._realname:
1813 filename = self._refs_container.refpath(self._realname)
1814 try:
1815 if os.path.lexists(filename):
1816 os.remove(filename)
1817 except FileNotFoundError:
1818 pass
1819 self._refs_container._remove_packed_ref(self._realname)
1821 self._deleted = True
1824class NamespacedRefsContainer(RefsContainer):
1825 """Wrapper that adds namespace prefix to all ref operations.
1827 This implements Git's GIT_NAMESPACE feature, which stores refs under
1828 refs/namespaces/<namespace>/ and filters operations to only show refs
1829 within that namespace.
1831 Example:
1832 With namespace "foo", a ref "refs/heads/master" is stored as
1833 "refs/namespaces/foo/refs/heads/master" in the underlying container.
1834 """
1836 def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
1837 """Initialize NamespacedRefsContainer.
1839 Args:
1840 refs: The underlying refs container to wrap
1841 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
1842 """
1843 super().__init__(logger=refs._logger)
1844 self._refs = refs
1845 # Build namespace prefix: refs/namespaces/<namespace>/
1846 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
1847 namespace_parts = namespace.split(b"/")
1848 self._namespace_prefix = b""
1849 for part in namespace_parts:
1850 self._namespace_prefix += b"refs/namespaces/" + part + b"/"
1852 def _apply_namespace(self, name: bytes) -> bytes:
1853 """Apply namespace prefix to a ref name."""
1854 # HEAD and other special refs are not namespaced
1855 if name == HEADREF or not name.startswith(b"refs/"):
1856 return name
1857 return self._namespace_prefix + name
1859 def _strip_namespace(self, name: bytes) -> bytes | None:
1860 """Remove namespace prefix from a ref name.
1862 Returns None if the ref is not in our namespace.
1863 """
1864 # HEAD and other special refs are not namespaced
1865 if name == HEADREF or not name.startswith(b"refs/"):
1866 return name
1867 if name.startswith(self._namespace_prefix):
1868 return name[len(self._namespace_prefix) :]
1869 return None
1871 def allkeys(self) -> set[Ref]:
1872 """Return all reference keys in this namespace."""
1873 keys: set[Ref] = set()
1874 for key in self._refs.allkeys():
1875 stripped = self._strip_namespace(key)
1876 if stripped is not None:
1877 keys.add(Ref(stripped))
1878 return keys
1880 def read_loose_ref(self, name: Ref) -> bytes | None:
1881 """Read a loose reference."""
1882 return self._refs.read_loose_ref(Ref(self._apply_namespace(name)))
1884 def get_packed_refs(self) -> dict[Ref, ObjectID]:
1885 """Get packed refs within this namespace."""
1886 packed: dict[Ref, ObjectID] = {}
1887 for name, value in self._refs.get_packed_refs().items():
1888 stripped = self._strip_namespace(name)
1889 if stripped is not None:
1890 packed[Ref(stripped)] = value
1891 return packed
1893 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
1894 """Add packed refs with namespace prefix."""
1895 namespaced_refs: dict[Ref, ObjectID | None] = {
1896 Ref(self._apply_namespace(name)): value for name, value in new_refs.items()
1897 }
1898 self._refs.add_packed_refs(namespaced_refs)
1900 def get_peeled(self, name: Ref) -> ObjectID | None:
1901 """Return the cached peeled value of a ref."""
1902 return self._refs.get_peeled(Ref(self._apply_namespace(name)))
1904 def set_symbolic_ref(
1905 self,
1906 name: Ref,
1907 other: Ref,
1908 committer: bytes | None = None,
1909 timestamp: int | None = None,
1910 timezone: int | None = None,
1911 message: bytes | None = None,
1912 ) -> None:
1913 """Make a ref point at another ref."""
1914 self._refs.set_symbolic_ref(
1915 Ref(self._apply_namespace(name)),
1916 Ref(self._apply_namespace(other)),
1917 committer=committer,
1918 timestamp=timestamp,
1919 timezone=timezone,
1920 message=message,
1921 )
1923 def set_if_equals(
1924 self,
1925 name: Ref,
1926 old_ref: ObjectID | None,
1927 new_ref: ObjectID,
1928 committer: bytes | None = None,
1929 timestamp: int | None = None,
1930 timezone: int | None = None,
1931 message: bytes | None = None,
1932 ) -> bool:
1933 """Set a refname to new_ref only if it currently equals old_ref."""
1934 return self._refs.set_if_equals(
1935 Ref(self._apply_namespace(name)),
1936 old_ref,
1937 new_ref,
1938 committer=committer,
1939 timestamp=timestamp,
1940 timezone=timezone,
1941 message=message,
1942 )
1944 def add_if_new(
1945 self,
1946 name: Ref,
1947 ref: ObjectID,
1948 committer: bytes | None = None,
1949 timestamp: int | None = None,
1950 timezone: int | None = None,
1951 message: bytes | None = None,
1952 ) -> bool:
1953 """Add a new reference only if it does not already exist."""
1954 return self._refs.add_if_new(
1955 Ref(self._apply_namespace(name)),
1956 ref,
1957 committer=committer,
1958 timestamp=timestamp,
1959 timezone=timezone,
1960 message=message,
1961 )
1963 def remove_if_equals(
1964 self,
1965 name: Ref,
1966 old_ref: ObjectID | None,
1967 committer: bytes | None = None,
1968 timestamp: int | None = None,
1969 timezone: int | None = None,
1970 message: bytes | None = None,
1971 ) -> bool:
1972 """Remove a refname only if it currently equals old_ref."""
1973 return self._refs.remove_if_equals(
1974 Ref(self._apply_namespace(name)),
1975 old_ref,
1976 committer=committer,
1977 timestamp=timestamp,
1978 timezone=timezone,
1979 message=message,
1980 )
1982 def pack_refs(self, all: bool = False) -> None:
1983 """Pack loose refs into packed-refs file.
1985 Note: This packs all refs in the underlying container, not just
1986 those in the namespace.
1987 """
1988 self._refs.pack_refs(all=all)
1991def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
1992 """Filter refs to only include those with a given prefix.
1994 Args:
1995 refs: A dictionary of refs.
1996 prefixes: The prefixes to filter by.
1997 """
1998 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
1999 return filtered
2002def is_per_worktree_ref(ref: bytes) -> bool:
2003 """Returns whether a reference is stored per worktree or not.
2005 Per-worktree references are:
2006 - all pseudorefs, e.g. HEAD
2007 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
2009 All refs starting with "refs/" are shared, except for the ones listed above.
2011 See https://git-scm.com/docs/git-worktree#_refs.
2012 """
2013 return not ref.startswith(b"refs/") or ref.startswith(
2014 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
2015 )