Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25__all__ = [
26 "HEADREF",
27 "LOCAL_BRANCH_PREFIX",
28 "LOCAL_NOTES_PREFIX",
29 "LOCAL_REMOTE_PREFIX",
30 "LOCAL_REPLACE_PREFIX",
31 "LOCAL_TAG_PREFIX",
32 "SYMREF",
33 "DictRefsContainer",
34 "DiskRefsContainer",
35 "NamespacedRefsContainer",
36 "Ref",
37 "RefsContainer",
38 "SymrefLoop",
39 "check_ref_format",
40 "extract_branch_name",
41 "extract_tag_name",
42 "filter_ref_prefix",
43 "is_local_branch",
44 "is_per_worktree_ref",
45 "local_branch_name",
46 "local_replace_name",
47 "local_tag_name",
48 "parse_remote_ref",
49 "parse_symref_value",
50 "read_info_refs",
51 "read_packed_refs",
52 "read_packed_refs_with_peeled",
53 "set_ref_from_raw",
54 "shorten_ref_name",
55 "write_packed_refs",
56]
58import os
59import types
60from collections.abc import Callable, Iterable, Iterator, Mapping
61from contextlib import suppress
62from typing import (
63 IO,
64 TYPE_CHECKING,
65 Any,
66 BinaryIO,
67 NewType,
68 TypeVar,
69)
71if TYPE_CHECKING:
72 from .file import _GitFile
74from .errors import PackedRefsException, RefFormatError
75from .file import GitFile, ensure_dir_exists
76from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
78Ref = NewType("Ref", bytes)
80T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None])
82HEADREF = Ref(b"HEAD")
83SYMREF = b"ref: "
84LOCAL_BRANCH_PREFIX = b"refs/heads/"
85LOCAL_TAG_PREFIX = b"refs/tags/"
86LOCAL_REMOTE_PREFIX = b"refs/remotes/"
87LOCAL_NOTES_PREFIX = b"refs/notes/"
88LOCAL_REPLACE_PREFIX = b"refs/replace/"
89BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[")
92class SymrefLoop(Exception):
93 """There is a loop between one or more symrefs."""
95 def __init__(self, ref: bytes, depth: int) -> None:
96 """Initialize SymrefLoop exception."""
97 self.ref = ref
98 self.depth = depth
101def parse_symref_value(contents: bytes) -> bytes:
102 """Parse a symref value.
104 Args:
105 contents: Contents to parse
106 Returns: Destination
107 """
108 if contents.startswith(SYMREF):
109 return contents[len(SYMREF) :].rstrip(b"\r\n")
110 raise ValueError(contents)
113def check_ref_format(refname: Ref) -> bool:
114 """Check if a refname is correctly formatted.
116 Implements all the same rules as git-check-ref-format[1].
118 [1]
119 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
121 Args:
122 refname: The refname to check
123 Returns: True if refname is valid, False otherwise
124 """
125 # These could be combined into one big expression, but are listed
126 # separately to parallel [1].
127 if b"/." in refname or refname.startswith(b"."): # type: ignore[comparison-overlap]
128 return False
129 if b"/" not in refname: # type: ignore[comparison-overlap]
130 return False
131 if b".." in refname: # type: ignore[comparison-overlap]
132 return False
133 for i, c in enumerate(refname):
134 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
135 return False
136 if refname[-1] in b"/.":
137 return False
138 if refname.endswith(b".lock"):
139 return False
140 if b"@{" in refname: # type: ignore[comparison-overlap]
141 return False
142 if b"\\" in refname: # type: ignore[comparison-overlap]
143 return False
144 return True
147def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
148 """Parse a remote ref into remote name and branch name.
150 Args:
151 ref: Remote ref like b"refs/remotes/origin/main"
153 Returns:
154 Tuple of (remote_name, branch_name)
156 Raises:
157 ValueError: If ref is not a valid remote ref
158 """
159 if not ref.startswith(LOCAL_REMOTE_PREFIX):
160 raise ValueError(f"Not a remote ref: {ref!r}")
162 # Remove the prefix
163 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
165 # Split into remote name and branch name
166 parts = remainder.split(b"/", 1)
167 if len(parts) != 2:
168 raise ValueError(f"Invalid remote ref format: {ref!r}")
170 remote_name, branch_name = parts
171 return (remote_name, branch_name)
174def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None:
175 """Set a reference from a raw ref value.
177 This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs.
179 Args:
180 refs: The RefsContainer to set the ref in
181 name: The ref name to set
182 raw_ref: The raw ref value (either a symbolic ref or an ObjectID)
183 """
184 if raw_ref.startswith(SYMREF):
185 # It's a symbolic ref
186 target = Ref(raw_ref[len(SYMREF) :])
187 refs.set_symbolic_ref(name, target)
188 else:
189 # It's a direct ObjectID
190 refs[name] = ObjectID(raw_ref)
193class RefsContainer:
194 """A container for refs."""
196 def __init__(
197 self,
198 logger: Callable[
199 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
200 ]
201 | None = None,
202 ) -> None:
203 """Initialize RefsContainer with optional logger function."""
204 self._logger = logger
206 def _log(
207 self,
208 ref: bytes,
209 old_sha: bytes | None,
210 new_sha: bytes | None,
211 committer: bytes | None = None,
212 timestamp: int | None = None,
213 timezone: int | None = None,
214 message: bytes | None = None,
215 ) -> None:
216 if self._logger is None:
217 return
218 if message is None:
219 return
220 # Use ZERO_SHA for None values, matching git behavior
221 if old_sha is None:
222 old_sha = ZERO_SHA
223 if new_sha is None:
224 new_sha = ZERO_SHA
225 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
227 def set_symbolic_ref(
228 self,
229 name: Ref,
230 other: Ref,
231 committer: bytes | None = None,
232 timestamp: int | None = None,
233 timezone: int | None = None,
234 message: bytes | None = None,
235 ) -> None:
236 """Make a ref point at another ref.
238 Args:
239 name: Name of the ref to set
240 other: Name of the ref to point at
241 committer: Optional committer name/email
242 timestamp: Optional timestamp
243 timezone: Optional timezone
244 message: Optional message
245 """
246 raise NotImplementedError(self.set_symbolic_ref)
248 def get_packed_refs(self) -> dict[Ref, ObjectID]:
249 """Get contents of the packed-refs file.
251 Returns: Dictionary mapping ref names to SHA1s
253 Note: Will return an empty dictionary when no packed-refs file is
254 present.
255 """
256 raise NotImplementedError(self.get_packed_refs)
258 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
259 """Add the given refs as packed refs.
261 Args:
262 new_refs: A mapping of ref names to targets; if a target is None that
263 means remove the ref
264 """
265 raise NotImplementedError(self.add_packed_refs)
267 def get_peeled(self, name: Ref) -> ObjectID | None:
268 """Return the cached peeled value of a ref, if available.
270 Args:
271 name: Name of the ref to peel
272 Returns: The peeled value of the ref. If the ref is known not point to
273 a tag, this will be the SHA the ref refers to. If the ref may point
274 to a tag, but no cached information is available, None is returned.
275 """
276 return None
278 def import_refs(
279 self,
280 base: Ref,
281 other: Mapping[Ref, ObjectID | None],
282 committer: bytes | None = None,
283 timestamp: bytes | None = None,
284 timezone: bytes | None = None,
285 message: bytes | None = None,
286 prune: bool = False,
287 ) -> None:
288 """Import refs from another repository.
290 Args:
291 base: Base ref to import into (e.g., b'refs/remotes/origin')
292 other: Dictionary of refs to import
293 committer: Optional committer for reflog
294 timestamp: Optional timestamp for reflog
295 timezone: Optional timezone for reflog
296 message: Optional message for reflog
297 prune: If True, remove refs not in other
298 """
299 if prune:
300 to_delete = set(self.subkeys(base))
301 else:
302 to_delete = set()
303 for name, value in other.items():
304 if value is None:
305 to_delete.add(name)
306 else:
307 self.set_if_equals(
308 Ref(b"/".join((base, name))), None, value, message=message
309 )
310 if to_delete:
311 try:
312 to_delete.remove(name)
313 except KeyError:
314 pass
315 for ref in to_delete:
316 self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message)
318 def allkeys(self) -> set[Ref]:
319 """All refs present in this container."""
320 raise NotImplementedError(self.allkeys)
322 def __iter__(self) -> Iterator[Ref]:
323 """Iterate over all reference keys."""
324 return iter(self.allkeys())
326 def keys(self, base: Ref | None = None) -> set[Ref]:
327 """Refs present in this container.
329 Args:
330 base: An optional base to return refs under.
331 Returns: An unsorted set of valid refs in this container, including
332 packed refs.
333 """
334 if base is not None:
335 return self.subkeys(base)
336 else:
337 return self.allkeys()
339 def subkeys(self, base: Ref) -> set[Ref]:
340 """Refs present in this container under a base.
342 Args:
343 base: The base to return refs under.
344 Returns: A set of valid refs in this container under the base; the base
345 prefix is stripped from the ref names returned.
346 """
347 keys: set[Ref] = set()
348 base_len = len(base) + 1
349 for refname in self.allkeys():
350 if refname.startswith(base):
351 keys.add(Ref(refname[base_len:]))
352 return keys
354 def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]:
355 """Return the contents of this container as a dictionary."""
356 ret: dict[Ref, ObjectID] = {}
357 keys = self.keys(base)
358 base_bytes: bytes
359 if base is None:
360 base_bytes = b""
361 else:
362 base_bytes = base.rstrip(b"/")
363 for key in keys:
364 try:
365 ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))]
366 except (SymrefLoop, KeyError):
367 continue # Unable to resolve
369 return ret
371 def _check_refname(self, name: Ref) -> None:
372 """Ensure a refname is valid and lives in refs or is HEAD.
374 HEAD is not a valid refname according to git-check-ref-format, but this
375 class needs to be able to touch HEAD. Also, check_ref_format expects
376 refnames without the leading 'refs/', but this class requires that
377 so it cannot touch anything outside the refs dir (or HEAD).
379 Args:
380 name: The name of the reference.
382 Raises:
383 KeyError: if a refname is not HEAD or is otherwise not valid.
384 """
385 if name in (HEADREF, Ref(b"refs/stash")):
386 return
387 if not name.startswith(b"refs/") or not check_ref_format(Ref(name[5:])):
388 raise RefFormatError(name)
390 def read_ref(self, refname: Ref) -> bytes | None:
391 """Read a reference without following any references.
393 Args:
394 refname: The name of the reference
395 Returns: The contents of the ref file, or None if it does
396 not exist.
397 """
398 contents = self.read_loose_ref(refname)
399 if not contents:
400 contents = self.get_packed_refs().get(refname, None)
401 return contents
403 def read_loose_ref(self, name: Ref) -> bytes | None:
404 """Read a loose reference and return its contents.
406 Args:
407 name: the refname to read
408 Returns: The contents of the ref file, or None if it does
409 not exist.
410 """
411 raise NotImplementedError(self.read_loose_ref)
413 def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]:
414 """Follow a reference name.
416 Returns: a tuple of (refnames, sha), wheres refnames are the names of
417 references in the chain
418 """
419 contents: bytes | None = SYMREF + name
420 depth = 0
421 refnames: list[Ref] = []
422 while contents and contents.startswith(SYMREF):
423 refname = Ref(contents[len(SYMREF) :])
424 refnames.append(refname)
425 contents = self.read_ref(refname)
426 if not contents:
427 break
428 depth += 1
429 if depth > 5:
430 raise SymrefLoop(name, depth)
431 return refnames, ObjectID(contents) if contents else None
433 def __contains__(self, refname: Ref) -> bool:
434 """Check if a reference exists."""
435 if self.read_ref(refname):
436 return True
437 return False
439 def __getitem__(self, name: Ref) -> ObjectID:
440 """Get the SHA1 for a reference name.
442 This method follows all symbolic references.
443 """
444 _, sha = self.follow(name)
445 if sha is None:
446 raise KeyError(name)
447 return sha
449 def set_if_equals(
450 self,
451 name: Ref,
452 old_ref: ObjectID | None,
453 new_ref: ObjectID,
454 committer: bytes | None = None,
455 timestamp: int | None = None,
456 timezone: int | None = None,
457 message: bytes | None = None,
458 ) -> bool:
459 """Set a refname to new_ref only if it currently equals old_ref.
461 This method follows all symbolic references if applicable for the
462 subclass, and can be used to perform an atomic compare-and-swap
463 operation.
465 Args:
466 name: The refname to set.
467 old_ref: The old sha the refname must refer to, or None to set
468 unconditionally.
469 new_ref: The new sha the refname will refer to.
470 committer: Optional committer name/email
471 timestamp: Optional timestamp
472 timezone: Optional timezone
473 message: Message for reflog
474 Returns: True if the set was successful, False otherwise.
475 """
476 raise NotImplementedError(self.set_if_equals)
478 def add_if_new(
479 self,
480 name: Ref,
481 ref: ObjectID,
482 committer: bytes | None = None,
483 timestamp: int | None = None,
484 timezone: int | None = None,
485 message: bytes | None = None,
486 ) -> bool:
487 """Add a new reference only if it does not already exist.
489 Args:
490 name: Ref name
491 ref: Ref value
492 committer: Optional committer name/email
493 timestamp: Optional timestamp
494 timezone: Optional timezone
495 message: Optional message for reflog
496 """
497 raise NotImplementedError(self.add_if_new)
499 def __setitem__(self, name: Ref, ref: ObjectID) -> None:
500 """Set a reference name to point to the given SHA1.
502 This method follows all symbolic references if applicable for the
503 subclass.
505 Note: This method unconditionally overwrites the contents of a
506 reference. To update atomically only if the reference has not
507 changed, use set_if_equals().
509 Args:
510 name: The refname to set.
511 ref: The new sha the refname will refer to.
512 """
513 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
514 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
515 self.set_if_equals(name, None, ref)
517 def remove_if_equals(
518 self,
519 name: Ref,
520 old_ref: ObjectID | None,
521 committer: bytes | None = None,
522 timestamp: int | None = None,
523 timezone: int | None = None,
524 message: bytes | None = None,
525 ) -> bool:
526 """Remove a refname only if it currently equals old_ref.
528 This method does not follow symbolic references, even if applicable for
529 the subclass. It can be used to perform an atomic compare-and-delete
530 operation.
532 Args:
533 name: The refname to delete.
534 old_ref: The old sha the refname must refer to, or None to
535 delete unconditionally.
536 committer: Optional committer name/email
537 timestamp: Optional timestamp
538 timezone: Optional timezone
539 message: Message for reflog
540 Returns: True if the delete was successful, False otherwise.
541 """
542 raise NotImplementedError(self.remove_if_equals)
544 def __delitem__(self, name: Ref) -> None:
545 """Remove a refname.
547 This method does not follow symbolic references, even if applicable for
548 the subclass.
550 Note: This method unconditionally deletes the contents of a reference.
551 To delete atomically only if the reference has not changed, use
552 remove_if_equals().
554 Args:
555 name: The refname to delete.
556 """
557 self.remove_if_equals(name, None)
559 def get_symrefs(self) -> dict[Ref, Ref]:
560 """Get a dict with all symrefs in this container.
562 Returns: Dictionary mapping source ref to target ref
563 """
564 ret: dict[Ref, Ref] = {}
565 for src in self.allkeys():
566 try:
567 ref_value = self.read_ref(src)
568 assert ref_value is not None
569 dst = parse_symref_value(ref_value)
570 except ValueError:
571 pass
572 else:
573 ret[src] = Ref(dst)
574 return ret
576 def pack_refs(self, all: bool = False) -> None:
577 """Pack loose refs into packed-refs file.
579 Args:
580 all: If True, pack all refs. If False, only pack tags.
581 """
582 raise NotImplementedError(self.pack_refs)
585class DictRefsContainer(RefsContainer):
586 """RefsContainer backed by a simple dict.
588 This container does not support symbolic or packed references and is not
589 threadsafe.
590 """
592 def __init__(
593 self,
594 refs: dict[Ref, bytes],
595 logger: Callable[
596 [
597 bytes,
598 bytes | None,
599 bytes | None,
600 bytes | None,
601 int | None,
602 int | None,
603 bytes | None,
604 ],
605 None,
606 ]
607 | None = None,
608 ) -> None:
609 """Initialize DictRefsContainer with refs dictionary and optional logger."""
610 super().__init__(logger=logger)
611 self._refs = refs
612 self._peeled: dict[Ref, ObjectID] = {}
613 self._watchers: set[Any] = set()
615 def allkeys(self) -> set[Ref]:
616 """Return all reference keys."""
617 return set(self._refs.keys())
619 def read_loose_ref(self, name: Ref) -> bytes | None:
620 """Read a loose reference."""
621 return self._refs.get(name, None)
623 def get_packed_refs(self) -> dict[Ref, ObjectID]:
624 """Get packed references."""
625 return {}
627 def _notify(self, ref: bytes, newsha: bytes | None) -> None:
628 for watcher in self._watchers:
629 watcher._notify((ref, newsha))
631 def set_symbolic_ref(
632 self,
633 name: Ref,
634 other: Ref,
635 committer: bytes | None = None,
636 timestamp: int | None = None,
637 timezone: int | None = None,
638 message: bytes | None = None,
639 ) -> None:
640 """Make a ref point at another ref.
642 Args:
643 name: Name of the ref to set
644 other: Name of the ref to point at
645 committer: Optional committer name for reflog
646 timestamp: Optional timestamp for reflog
647 timezone: Optional timezone for reflog
648 message: Optional message for reflog
649 """
650 old = self.follow(name)[-1]
651 new = SYMREF + other
652 self._refs[name] = new
653 self._notify(name, new)
654 self._log(
655 name,
656 old,
657 new,
658 committer=committer,
659 timestamp=timestamp,
660 timezone=timezone,
661 message=message,
662 )
664 def set_if_equals(
665 self,
666 name: Ref,
667 old_ref: ObjectID | None,
668 new_ref: ObjectID,
669 committer: bytes | None = None,
670 timestamp: int | None = None,
671 timezone: int | None = None,
672 message: bytes | None = None,
673 ) -> bool:
674 """Set a refname to new_ref only if it currently equals old_ref.
676 This method follows all symbolic references, and can be used to perform
677 an atomic compare-and-swap operation.
679 Args:
680 name: The refname to set.
681 old_ref: The old sha the refname must refer to, or None to set
682 unconditionally.
683 new_ref: The new sha the refname will refer to.
684 committer: Optional committer name for reflog
685 timestamp: Optional timestamp for reflog
686 timezone: Optional timezone for reflog
687 message: Optional message for reflog
689 Returns:
690 True if the set was successful, False otherwise.
691 """
692 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
693 return False
694 # Only update the specific ref requested, not the whole chain
695 self._check_refname(name)
696 old = self._refs.get(name)
697 self._refs[name] = new_ref
698 self._notify(name, new_ref)
699 self._log(
700 name,
701 old,
702 new_ref,
703 committer=committer,
704 timestamp=timestamp,
705 timezone=timezone,
706 message=message,
707 )
708 return True
710 def add_if_new(
711 self,
712 name: Ref,
713 ref: ObjectID,
714 committer: bytes | None = None,
715 timestamp: int | None = None,
716 timezone: int | None = None,
717 message: bytes | None = None,
718 ) -> bool:
719 """Add a new reference only if it does not already exist.
721 Args:
722 name: Ref name
723 ref: Ref value
724 committer: Optional committer name for reflog
725 timestamp: Optional timestamp for reflog
726 timezone: Optional timezone for reflog
727 message: Optional message for reflog
729 Returns:
730 True if the add was successful, False otherwise.
731 """
732 if name in self._refs:
733 return False
734 self._refs[name] = ref
735 self._notify(name, ref)
736 self._log(
737 name,
738 None,
739 ref,
740 committer=committer,
741 timestamp=timestamp,
742 timezone=timezone,
743 message=message,
744 )
745 return True
747 def remove_if_equals(
748 self,
749 name: Ref,
750 old_ref: ObjectID | None,
751 committer: bytes | None = None,
752 timestamp: int | None = None,
753 timezone: int | None = None,
754 message: bytes | None = None,
755 ) -> bool:
756 """Remove a refname only if it currently equals old_ref.
758 This method does not follow symbolic references. It can be used to
759 perform an atomic compare-and-delete operation.
761 Args:
762 name: The refname to delete.
763 old_ref: The old sha the refname must refer to, or None to
764 delete unconditionally.
765 committer: Optional committer name for reflog
766 timestamp: Optional timestamp for reflog
767 timezone: Optional timezone for reflog
768 message: Optional message for reflog
770 Returns:
771 True if the delete was successful, False otherwise.
772 """
773 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
774 return False
775 try:
776 old = self._refs.pop(name)
777 except KeyError:
778 pass
779 else:
780 self._notify(name, None)
781 self._log(
782 name,
783 old,
784 None,
785 committer=committer,
786 timestamp=timestamp,
787 timezone=timezone,
788 message=message,
789 )
790 return True
792 def get_peeled(self, name: Ref) -> ObjectID | None:
793 """Get peeled version of a reference."""
794 return self._peeled.get(name)
796 def _update(self, refs: Mapping[Ref, ObjectID]) -> None:
797 """Update multiple refs; intended only for testing."""
798 # TODO(dborowitz): replace this with a public function that uses
799 # set_if_equal.
800 for ref, sha in refs.items():
801 self.set_if_equals(ref, None, sha)
803 def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None:
804 """Update cached peeled refs; intended only for testing."""
805 self._peeled.update(peeled)
808class DiskRefsContainer(RefsContainer):
809 """Refs container that reads refs from disk."""
811 def __init__(
812 self,
813 path: str | bytes | os.PathLike[str],
814 worktree_path: str | bytes | os.PathLike[str] | None = None,
815 logger: Callable[
816 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
817 ]
818 | None = None,
819 ) -> None:
820 """Initialize DiskRefsContainer."""
821 super().__init__(logger=logger)
822 # Convert path-like objects to strings, then to bytes for Git compatibility
823 self.path = os.fsencode(os.fspath(path))
824 if worktree_path is None:
825 self.worktree_path = self.path
826 else:
827 self.worktree_path = os.fsencode(os.fspath(worktree_path))
828 self._packed_refs: dict[Ref, ObjectID] | None = None
829 self._peeled_refs: dict[Ref, ObjectID] | None = None
831 def __repr__(self) -> str:
832 """Return string representation of DiskRefsContainer."""
833 return f"{self.__class__.__name__}({self.path!r})"
835 def _iter_dir(
836 self,
837 path: bytes,
838 base: bytes,
839 dir_filter: Callable[[bytes], bool] | None = None,
840 ) -> Iterator[Ref]:
841 refspath = os.path.join(path, base.rstrip(b"/"))
842 prefix_len = len(os.path.join(path, b""))
844 for root, dirs, files in os.walk(refspath):
845 directory = root[prefix_len:]
846 if os.path.sep != "/":
847 directory = directory.replace(os.fsencode(os.path.sep), b"/")
848 if dir_filter is not None:
849 dirs[:] = [
850 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
851 ]
853 for filename in files:
854 refname = b"/".join([directory, filename])
855 if check_ref_format(Ref(refname)):
856 yield Ref(refname)
858 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]:
859 base = base.rstrip(b"/") + b"/"
860 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = []
861 if base != b"refs/":
862 path = self.worktree_path if is_per_worktree_ref(base) else self.path
863 search_paths.append((path, None))
864 elif self.worktree_path == self.path:
865 # Iterate through all the refs from the main worktree
866 search_paths.append((self.path, None))
867 else:
868 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
869 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
870 # Iterate through all the per-worktree refs from the worktree's gitdir
871 search_paths.append((self.worktree_path, is_per_worktree_ref))
873 for path, dir_filter in search_paths:
874 yield from self._iter_dir(path, base, dir_filter=dir_filter)
876 def subkeys(self, base: Ref) -> set[Ref]:
877 """Return subkeys under a given base reference path."""
878 subkeys: set[Ref] = set()
880 for key in self._iter_loose_refs(base):
881 if key.startswith(base):
882 subkeys.add(Ref(key[len(base) :].strip(b"/")))
884 for key in self.get_packed_refs():
885 if key.startswith(base):
886 subkeys.add(Ref(key[len(base) :].strip(b"/")))
887 return subkeys
889 def allkeys(self) -> set[Ref]:
890 """Return all reference keys."""
891 allkeys: set[Ref] = set()
892 if os.path.exists(self.refpath(HEADREF)):
893 allkeys.add(Ref(HEADREF))
895 allkeys.update(self._iter_loose_refs())
896 allkeys.update(self.get_packed_refs())
897 return allkeys
899 def refpath(self, name: bytes) -> bytes:
900 """Return the disk path of a ref."""
901 path = name
902 if os.path.sep != "/":
903 path = path.replace(b"/", os.fsencode(os.path.sep))
905 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
906 return os.path.join(root_dir, path)
908 def get_packed_refs(self) -> dict[Ref, ObjectID]:
909 """Get contents of the packed-refs file.
911 Returns: Dictionary mapping ref names to SHA1s
913 Note: Will return an empty dictionary when no packed-refs file is
914 present.
915 """
916 # TODO: invalidate the cache on repacking
917 if self._packed_refs is None:
918 # set both to empty because we want _peeled_refs to be
919 # None if and only if _packed_refs is also None.
920 self._packed_refs = {}
921 self._peeled_refs = {}
922 path = os.path.join(self.path, b"packed-refs")
923 try:
924 f = GitFile(path, "rb")
925 except FileNotFoundError:
926 return {}
927 with f:
928 first_line = next(iter(f)).rstrip()
929 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
930 for sha, name, peeled in read_packed_refs_with_peeled(f):
931 self._packed_refs[name] = sha
932 if peeled:
933 self._peeled_refs[name] = peeled
934 else:
935 f.seek(0)
936 for sha, name in read_packed_refs(f):
937 self._packed_refs[name] = sha
938 return self._packed_refs
940 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
941 """Add the given refs as packed refs.
943 Args:
944 new_refs: A mapping of ref names to targets; if a target is None that
945 means remove the ref
946 """
947 if not new_refs:
948 return
950 path = os.path.join(self.path, b"packed-refs")
952 with GitFile(path, "wb") as f:
953 # reread cached refs from disk, while holding the lock
954 packed_refs = self.get_packed_refs().copy()
956 for ref, target in new_refs.items():
957 # sanity check
958 if ref == HEADREF:
959 raise ValueError("cannot pack HEAD")
961 # remove any loose refs pointing to this one -- please
962 # note that this bypasses remove_if_equals as we don't
963 # want to affect packed refs in here
964 with suppress(OSError):
965 os.remove(self.refpath(ref))
967 if target is not None:
968 packed_refs[ref] = target
969 else:
970 packed_refs.pop(ref, None)
972 write_packed_refs(f, packed_refs, self._peeled_refs)
974 self._packed_refs = packed_refs
976 def get_peeled(self, name: Ref) -> ObjectID | None:
977 """Return the cached peeled value of a ref, if available.
979 Args:
980 name: Name of the ref to peel
981 Returns: The peeled value of the ref. If the ref is known not point to
982 a tag, this will be the SHA the ref refers to. If the ref may point
983 to a tag, but no cached information is available, None is returned.
984 """
985 self.get_packed_refs()
986 if (
987 self._peeled_refs is None
988 or self._packed_refs is None
989 or name not in self._packed_refs
990 ):
991 # No cache: no peeled refs were read, or this ref is loose
992 return None
993 if name in self._peeled_refs:
994 return self._peeled_refs[name]
995 else:
996 # Known not peelable
997 return self[name]
999 def read_loose_ref(self, name: Ref) -> bytes | None:
1000 """Read a reference file and return its contents.
1002 If the reference file a symbolic reference, only read the first line of
1003 the file. Otherwise, only read the first 40 bytes.
1005 Args:
1006 name: the refname to read, relative to refpath
1007 Returns: The contents of the ref file, or None if the file does not
1008 exist.
1010 Raises:
1011 IOError: if any other error occurs
1012 """
1013 filename = self.refpath(name)
1014 try:
1015 with GitFile(filename, "rb") as f:
1016 header = f.read(len(SYMREF))
1017 if header == SYMREF:
1018 # Read only the first line
1019 return header + next(iter(f)).rstrip(b"\r\n")
1020 else:
1021 # Read only the first 40 bytes
1022 return header + f.read(40 - len(SYMREF))
1023 except (OSError, UnicodeError):
1024 # don't assume anything specific about the error; in
1025 # particular, invalid or forbidden paths can raise weird
1026 # errors depending on the specific operating system
1027 return None
1029 def _remove_packed_ref(self, name: Ref) -> None:
1030 if self._packed_refs is None:
1031 return
1032 filename = os.path.join(self.path, b"packed-refs")
1033 # reread cached refs from disk, while holding the lock
1034 f = GitFile(filename, "wb")
1035 try:
1036 self._packed_refs = None
1037 self.get_packed_refs()
1039 if self._packed_refs is None or name not in self._packed_refs:
1040 f.abort()
1041 return
1043 del self._packed_refs[name]
1044 if self._peeled_refs is not None:
1045 with suppress(KeyError):
1046 del self._peeled_refs[name]
1047 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1048 f.close()
1049 except BaseException:
1050 f.abort()
1051 raise
1053 def set_symbolic_ref(
1054 self,
1055 name: Ref,
1056 other: Ref,
1057 committer: bytes | None = None,
1058 timestamp: int | None = None,
1059 timezone: int | None = None,
1060 message: bytes | None = None,
1061 ) -> None:
1062 """Make a ref point at another ref.
1064 Args:
1065 name: Name of the ref to set
1066 other: Name of the ref to point at
1067 committer: Optional committer name
1068 timestamp: Optional timestamp
1069 timezone: Optional timezone
1070 message: Optional message to describe the change
1071 """
1072 self._check_refname(name)
1073 self._check_refname(other)
1074 filename = self.refpath(name)
1075 f = GitFile(filename, "wb")
1076 try:
1077 f.write(SYMREF + other + b"\n")
1078 sha = self.follow(name)[-1]
1079 self._log(
1080 name,
1081 sha,
1082 sha,
1083 committer=committer,
1084 timestamp=timestamp,
1085 timezone=timezone,
1086 message=message,
1087 )
1088 except BaseException:
1089 f.abort()
1090 raise
1091 else:
1092 f.close()
1094 def set_if_equals(
1095 self,
1096 name: Ref,
1097 old_ref: ObjectID | None,
1098 new_ref: ObjectID,
1099 committer: bytes | None = None,
1100 timestamp: int | None = None,
1101 timezone: int | None = None,
1102 message: bytes | None = None,
1103 ) -> bool:
1104 """Set a refname to new_ref only if it currently equals old_ref.
1106 This method follows all symbolic references, and can be used to perform
1107 an atomic compare-and-swap operation.
1109 Args:
1110 name: The refname to set.
1111 old_ref: The old sha the refname must refer to, or None to set
1112 unconditionally.
1113 new_ref: The new sha the refname will refer to.
1114 committer: Optional committer name
1115 timestamp: Optional timestamp
1116 timezone: Optional timezone
1117 message: Set message for reflog
1118 Returns: True if the set was successful, False otherwise.
1119 """
1120 self._check_refname(name)
1121 try:
1122 realnames, _ = self.follow(name)
1123 realname = realnames[-1]
1124 except (KeyError, IndexError, SymrefLoop):
1125 realname = name
1126 filename = self.refpath(realname)
1128 # make sure none of the ancestor folders is in packed refs
1129 probe_ref = Ref(os.path.dirname(realname))
1130 packed_refs = self.get_packed_refs()
1131 while probe_ref:
1132 if packed_refs.get(probe_ref, None) is not None:
1133 raise NotADirectoryError(filename)
1134 probe_ref = Ref(os.path.dirname(probe_ref))
1136 ensure_dir_exists(os.path.dirname(filename))
1137 with GitFile(filename, "wb") as f:
1138 if old_ref is not None:
1139 try:
1140 # read again while holding the lock to handle race conditions
1141 orig_ref = self.read_loose_ref(realname)
1142 if orig_ref is None:
1143 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1144 if orig_ref != old_ref:
1145 f.abort()
1146 return False
1147 except OSError:
1148 f.abort()
1149 raise
1151 # Check if ref already has the desired value while holding the lock
1152 # This avoids fsync when ref is unchanged but still detects lock conflicts
1153 current_ref = self.read_loose_ref(realname)
1154 if current_ref is None:
1155 current_ref = packed_refs.get(realname, None)
1157 if current_ref is not None and current_ref == new_ref:
1158 # Ref already has desired value, abort write to avoid fsync
1159 f.abort()
1160 return True
1162 try:
1163 f.write(new_ref + b"\n")
1164 except OSError:
1165 f.abort()
1166 raise
1167 self._log(
1168 realname,
1169 old_ref,
1170 new_ref,
1171 committer=committer,
1172 timestamp=timestamp,
1173 timezone=timezone,
1174 message=message,
1175 )
1176 return True
1178 def add_if_new(
1179 self,
1180 name: Ref,
1181 ref: ObjectID,
1182 committer: bytes | None = None,
1183 timestamp: int | None = None,
1184 timezone: int | None = None,
1185 message: bytes | None = None,
1186 ) -> bool:
1187 """Add a new reference only if it does not already exist.
1189 This method follows symrefs, and only ensures that the last ref in the
1190 chain does not exist.
1192 Args:
1193 name: The refname to set.
1194 ref: The new sha the refname will refer to.
1195 committer: Optional committer name
1196 timestamp: Optional timestamp
1197 timezone: Optional timezone
1198 message: Optional message for reflog
1199 Returns: True if the add was successful, False otherwise.
1200 """
1201 try:
1202 realnames, contents = self.follow(name)
1203 if contents is not None:
1204 return False
1205 realname = realnames[-1]
1206 except (KeyError, IndexError):
1207 realname = name
1208 self._check_refname(realname)
1209 filename = self.refpath(realname)
1210 ensure_dir_exists(os.path.dirname(filename))
1211 with GitFile(filename, "wb") as f:
1212 if os.path.exists(filename) or name in self.get_packed_refs():
1213 f.abort()
1214 return False
1215 try:
1216 f.write(ref + b"\n")
1217 except OSError:
1218 f.abort()
1219 raise
1220 else:
1221 self._log(
1222 name,
1223 None,
1224 ref,
1225 committer=committer,
1226 timestamp=timestamp,
1227 timezone=timezone,
1228 message=message,
1229 )
1230 return True
1232 def remove_if_equals(
1233 self,
1234 name: Ref,
1235 old_ref: ObjectID | None,
1236 committer: bytes | None = None,
1237 timestamp: int | None = None,
1238 timezone: int | None = None,
1239 message: bytes | None = None,
1240 ) -> bool:
1241 """Remove a refname only if it currently equals old_ref.
1243 This method does not follow symbolic references. It can be used to
1244 perform an atomic compare-and-delete operation.
1246 Args:
1247 name: The refname to delete.
1248 old_ref: The old sha the refname must refer to, or None to
1249 delete unconditionally.
1250 committer: Optional committer name
1251 timestamp: Optional timestamp
1252 timezone: Optional timezone
1253 message: Optional message
1254 Returns: True if the delete was successful, False otherwise.
1255 """
1256 self._check_refname(name)
1257 filename = self.refpath(name)
1258 ensure_dir_exists(os.path.dirname(filename))
1259 f = GitFile(filename, "wb")
1260 try:
1261 if old_ref is not None:
1262 orig_ref = self.read_loose_ref(name)
1263 if orig_ref is None:
1264 orig_ref = self.get_packed_refs().get(name)
1265 if orig_ref is None:
1266 orig_ref = ZERO_SHA
1267 if orig_ref != old_ref:
1268 return False
1270 # remove the reference file itself
1271 try:
1272 found = os.path.lexists(filename)
1273 except OSError:
1274 # may only be packed, or otherwise unstorable
1275 found = False
1277 if found:
1278 os.remove(filename)
1280 self._remove_packed_ref(name)
1281 self._log(
1282 name,
1283 old_ref,
1284 None,
1285 committer=committer,
1286 timestamp=timestamp,
1287 timezone=timezone,
1288 message=message,
1289 )
1290 finally:
1291 # never write, we just wanted the lock
1292 f.abort()
1294 # outside of the lock, clean-up any parent directory that might now
1295 # be empty. this ensures that re-creating a reference of the same
1296 # name of what was previously a directory works as expected
1297 parent = name
1298 while True:
1299 try:
1300 parent_bytes, _ = parent.rsplit(b"/", 1)
1301 parent = Ref(parent_bytes)
1302 except ValueError:
1303 break
1305 if parent == b"refs":
1306 break
1307 parent_filename = self.refpath(parent)
1308 try:
1309 os.rmdir(parent_filename)
1310 except OSError:
1311 # this can be caused by the parent directory being
1312 # removed by another process, being not empty, etc.
1313 # in any case, this is non fatal because we already
1314 # removed the reference, just ignore it
1315 break
1317 return True
1319 def pack_refs(self, all: bool = False) -> None:
1320 """Pack loose refs into packed-refs file.
1322 Args:
1323 all: If True, pack all refs. If False, only pack tags.
1324 """
1325 refs_to_pack: dict[Ref, ObjectID | None] = {}
1326 for ref in self.allkeys():
1327 if ref == HEADREF:
1328 # Never pack HEAD
1329 continue
1330 if all or ref.startswith(LOCAL_TAG_PREFIX):
1331 try:
1332 sha = self[ref]
1333 if sha:
1334 refs_to_pack[ref] = sha
1335 except KeyError:
1336 # Broken ref, skip it
1337 pass
1339 if refs_to_pack:
1340 self.add_packed_refs(refs_to_pack)
1343def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]:
1344 """Split a single ref line into a tuple of SHA1 and name."""
1345 fields = line.rstrip(b"\n\r").split(b" ")
1346 if len(fields) != 2:
1347 raise PackedRefsException(f"invalid ref line {line!r}")
1348 sha, name = fields
1349 if not valid_hexsha(sha):
1350 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1351 if not check_ref_format(Ref(name)):
1352 raise PackedRefsException(f"invalid ref name {name!r}")
1353 return (ObjectID(sha), Ref(name))
1356def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]:
1357 """Read a packed refs file.
1359 Args:
1360 f: file-like object to read from
1361 Returns: Iterator over tuples with SHA1s and ref names.
1362 """
1363 for line in f:
1364 if line.startswith(b"#"):
1365 # Comment
1366 continue
1367 if line.startswith(b"^"):
1368 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1369 yield _split_ref_line(line)
1372def read_packed_refs_with_peeled(
1373 f: IO[bytes],
1374) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]:
1375 """Read a packed refs file including peeled refs.
1377 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1378 with ref names, SHA1s, and peeled SHA1s (or None).
1380 Args:
1381 f: file-like object to read from, seek'ed to the second line
1382 """
1383 last = None
1384 for line in f:
1385 if line.startswith(b"#"):
1386 continue
1387 line = line.rstrip(b"\r\n")
1388 if line.startswith(b"^"):
1389 if not last:
1390 raise PackedRefsException("unexpected peeled ref line")
1391 if not valid_hexsha(line[1:]):
1392 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1393 sha, name = _split_ref_line(last)
1394 last = None
1395 yield (sha, name, ObjectID(line[1:]))
1396 else:
1397 if last:
1398 sha, name = _split_ref_line(last)
1399 yield (sha, name, None)
1400 last = line
1401 if last:
1402 sha, name = _split_ref_line(last)
1403 yield (sha, name, None)
1406def write_packed_refs(
1407 f: IO[bytes],
1408 packed_refs: Mapping[Ref, ObjectID],
1409 peeled_refs: Mapping[Ref, ObjectID] | None = None,
1410) -> None:
1411 """Write a packed refs file.
1413 Args:
1414 f: empty file-like object to write to
1415 packed_refs: dict of refname to sha of packed refs to write
1416 peeled_refs: dict of refname to peeled value of sha
1417 """
1418 if peeled_refs is None:
1419 peeled_refs = {}
1420 else:
1421 f.write(b"# pack-refs with: peeled\n")
1422 for refname in sorted(packed_refs.keys()):
1423 f.write(git_line(packed_refs[refname], refname))
1424 if refname in peeled_refs:
1425 f.write(b"^" + peeled_refs[refname] + b"\n")
1428def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]:
1429 """Read info/refs file.
1431 Args:
1432 f: File-like object to read from
1434 Returns:
1435 Dictionary mapping ref names to SHA1s
1436 """
1437 ret: dict[Ref, ObjectID] = {}
1438 for line in f.readlines():
1439 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1440 ret[Ref(name)] = ObjectID(sha)
1441 return ret
1444def is_local_branch(x: bytes) -> bool:
1445 """Check if a ref name is a local branch."""
1446 return x.startswith(LOCAL_BRANCH_PREFIX)
1449def local_branch_name(name: bytes) -> Ref:
1450 """Build a full branch ref from a short name.
1452 Args:
1453 name: Short branch name (e.g., b"master") or full ref
1455 Returns:
1456 Full branch ref name (e.g., b"refs/heads/master")
1458 Examples:
1459 >>> local_branch_name(b"master")
1460 b'refs/heads/master'
1461 >>> local_branch_name(b"refs/heads/master")
1462 b'refs/heads/master'
1463 """
1464 if name.startswith(LOCAL_BRANCH_PREFIX):
1465 return Ref(name)
1466 return Ref(LOCAL_BRANCH_PREFIX + name)
1469def local_tag_name(name: bytes) -> Ref:
1470 """Build a full tag ref from a short name.
1472 Args:
1473 name: Short tag name (e.g., b"v1.0") or full ref
1475 Returns:
1476 Full tag ref name (e.g., b"refs/tags/v1.0")
1478 Examples:
1479 >>> local_tag_name(b"v1.0")
1480 b'refs/tags/v1.0'
1481 >>> local_tag_name(b"refs/tags/v1.0")
1482 b'refs/tags/v1.0'
1483 """
1484 if name.startswith(LOCAL_TAG_PREFIX):
1485 return Ref(name)
1486 return Ref(LOCAL_TAG_PREFIX + name)
1489def local_replace_name(name: bytes) -> Ref:
1490 """Build a full replace ref from a short name.
1492 Args:
1493 name: Short replace name (object SHA) or full ref
1495 Returns:
1496 Full replace ref name (e.g., b"refs/replace/<sha>")
1498 Examples:
1499 >>> local_replace_name(b"abc123")
1500 b'refs/replace/abc123'
1501 >>> local_replace_name(b"refs/replace/abc123")
1502 b'refs/replace/abc123'
1503 """
1504 if name.startswith(LOCAL_REPLACE_PREFIX):
1505 return Ref(name)
1506 return Ref(LOCAL_REPLACE_PREFIX + name)
1509def extract_branch_name(ref: bytes) -> bytes:
1510 """Extract branch name from a full branch ref.
1512 Args:
1513 ref: Full branch ref (e.g., b"refs/heads/master")
1515 Returns:
1516 Short branch name (e.g., b"master")
1518 Raises:
1519 ValueError: If ref is not a local branch
1521 Examples:
1522 >>> extract_branch_name(b"refs/heads/master")
1523 b'master'
1524 >>> extract_branch_name(b"refs/heads/feature/foo")
1525 b'feature/foo'
1526 """
1527 if not ref.startswith(LOCAL_BRANCH_PREFIX):
1528 raise ValueError(f"Not a local branch ref: {ref!r}")
1529 return ref[len(LOCAL_BRANCH_PREFIX) :]
1532def extract_tag_name(ref: bytes) -> bytes:
1533 """Extract tag name from a full tag ref.
1535 Args:
1536 ref: Full tag ref (e.g., b"refs/tags/v1.0")
1538 Returns:
1539 Short tag name (e.g., b"v1.0")
1541 Raises:
1542 ValueError: If ref is not a local tag
1544 Examples:
1545 >>> extract_tag_name(b"refs/tags/v1.0")
1546 b'v1.0'
1547 """
1548 if not ref.startswith(LOCAL_TAG_PREFIX):
1549 raise ValueError(f"Not a local tag ref: {ref!r}")
1550 return ref[len(LOCAL_TAG_PREFIX) :]
1553def shorten_ref_name(ref: bytes) -> bytes:
1554 """Convert a full ref name to its short form.
1556 Args:
1557 ref: Full ref name (e.g., b"refs/heads/master")
1559 Returns:
1560 Short ref name (e.g., b"master")
1562 Examples:
1563 >>> shorten_ref_name(b"refs/heads/master")
1564 b'master'
1565 >>> shorten_ref_name(b"refs/remotes/origin/main")
1566 b'origin/main'
1567 >>> shorten_ref_name(b"refs/tags/v1.0")
1568 b'v1.0'
1569 >>> shorten_ref_name(b"HEAD")
1570 b'HEAD'
1571 """
1572 if ref.startswith(LOCAL_BRANCH_PREFIX):
1573 return ref[len(LOCAL_BRANCH_PREFIX) :]
1574 elif ref.startswith(LOCAL_REMOTE_PREFIX):
1575 return ref[len(LOCAL_REMOTE_PREFIX) :]
1576 elif ref.startswith(LOCAL_TAG_PREFIX):
1577 return ref[len(LOCAL_TAG_PREFIX) :]
1578 return ref
1581def _set_origin_head(
1582 refs: RefsContainer, origin: bytes, origin_head: bytes | None
1583) -> None:
1584 # set refs/remotes/origin/HEAD
1585 origin_base = b"refs/remotes/" + origin + b"/"
1586 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1587 origin_ref = Ref(origin_base + HEADREF)
1588 target_ref = Ref(origin_base + extract_branch_name(origin_head))
1589 if target_ref in refs:
1590 refs.set_symbolic_ref(origin_ref, target_ref)
1593def _set_default_branch(
1594 refs: RefsContainer,
1595 origin: bytes,
1596 origin_head: bytes | None,
1597 branch: bytes | None,
1598 ref_message: bytes | None,
1599) -> bytes:
1600 """Set the default branch."""
1601 origin_base = b"refs/remotes/" + origin + b"/"
1602 if branch:
1603 origin_ref = Ref(origin_base + branch)
1604 if origin_ref in refs:
1605 local_ref = Ref(local_branch_name(branch))
1606 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1607 head_ref = local_ref
1608 elif Ref(local_tag_name(branch)) in refs:
1609 head_ref = Ref(local_tag_name(branch))
1610 else:
1611 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1612 elif origin_head:
1613 head_ref = Ref(origin_head)
1614 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1615 origin_ref = Ref(origin_base + extract_branch_name(origin_head))
1616 else:
1617 origin_ref = Ref(origin_head)
1618 try:
1619 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1620 except KeyError:
1621 pass
1622 else:
1623 raise ValueError("neither origin_head nor branch are provided")
1624 return head_ref
1627def _set_head(
1628 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None
1629) -> ObjectID | None:
1630 if head_ref.startswith(LOCAL_TAG_PREFIX):
1631 # detach HEAD at specified tag
1632 head = refs[Ref(head_ref)]
1633 if isinstance(head, Tag):
1634 _cls, obj = head.object
1635 head = obj.get_object(obj).id
1636 del refs[HEADREF]
1637 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1638 else:
1639 # set HEAD to specific branch
1640 try:
1641 head = refs[Ref(head_ref)]
1642 refs.set_symbolic_ref(HEADREF, Ref(head_ref))
1643 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1644 except KeyError:
1645 head = None
1646 return head
1649def _import_remote_refs(
1650 refs_container: RefsContainer,
1651 remote_name: str,
1652 refs: Mapping[Ref, ObjectID | None],
1653 message: bytes | None = None,
1654 prune: bool = False,
1655 prune_tags: bool = False,
1656) -> None:
1657 from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs
1659 stripped_refs = strip_peeled_refs(refs)
1660 branches: dict[Ref, ObjectID | None] = {
1661 Ref(extract_branch_name(n)): v
1662 for (n, v) in stripped_refs.items()
1663 if n.startswith(LOCAL_BRANCH_PREFIX)
1664 }
1665 refs_container.import_refs(
1666 Ref(b"refs/remotes/" + remote_name.encode()),
1667 branches,
1668 message=message,
1669 prune=prune,
1670 )
1671 tags: dict[Ref, ObjectID | None] = {
1672 Ref(extract_tag_name(n)): v
1673 for (n, v) in stripped_refs.items()
1674 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1675 }
1676 refs_container.import_refs(
1677 Ref(LOCAL_TAG_PREFIX), tags, message=message, prune=prune_tags
1678 )
1681class locked_ref:
1682 """Lock a ref while making modifications.
1684 Works as a context manager.
1685 """
1687 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1688 """Initialize a locked ref.
1690 Args:
1691 refs_container: The DiskRefsContainer to lock the ref in
1692 refname: The ref name to lock
1693 """
1694 self._refs_container = refs_container
1695 self._refname = refname
1696 self._file: _GitFile | None = None
1697 self._realname: Ref | None = None
1698 self._deleted = False
1700 def __enter__(self) -> "locked_ref":
1701 """Enter the context manager and acquire the lock.
1703 Returns:
1704 This locked_ref instance
1706 Raises:
1707 OSError: If the lock cannot be acquired
1708 """
1709 self._refs_container._check_refname(self._refname)
1710 try:
1711 realnames, _ = self._refs_container.follow(self._refname)
1712 self._realname = realnames[-1]
1713 except (KeyError, IndexError, SymrefLoop):
1714 self._realname = self._refname
1716 filename = self._refs_container.refpath(self._realname)
1717 ensure_dir_exists(os.path.dirname(filename))
1718 f = GitFile(filename, "wb")
1719 self._file = f
1720 return self
1722 def __exit__(
1723 self,
1724 exc_type: type | None,
1725 exc_value: BaseException | None,
1726 traceback: types.TracebackType | None,
1727 ) -> None:
1728 """Exit the context manager and release the lock.
1730 Args:
1731 exc_type: Type of exception if one occurred
1732 exc_value: Exception instance if one occurred
1733 traceback: Traceback if an exception occurred
1734 """
1735 if self._file:
1736 if exc_type is not None or self._deleted:
1737 self._file.abort()
1738 else:
1739 self._file.close()
1741 def get(self) -> bytes | None:
1742 """Get the current value of the ref."""
1743 if not self._file:
1744 raise RuntimeError("locked_ref not in context")
1746 assert self._realname is not None
1747 current_ref = self._refs_container.read_loose_ref(self._realname)
1748 if current_ref is None:
1749 current_ref = self._refs_container.get_packed_refs().get(
1750 self._realname, None
1751 )
1752 return current_ref
1754 def ensure_equals(self, expected_value: bytes | None) -> bool:
1755 """Ensure the ref currently equals the expected value.
1757 Args:
1758 expected_value: The expected current value of the ref
1759 Returns:
1760 True if the ref equals the expected value, False otherwise
1761 """
1762 current_value = self.get()
1763 return current_value == expected_value
1765 def set(self, new_ref: bytes) -> None:
1766 """Set the ref to a new value.
1768 Args:
1769 new_ref: The new SHA1 or symbolic ref value
1770 """
1771 if not self._file:
1772 raise RuntimeError("locked_ref not in context")
1774 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1775 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1777 self._file.seek(0)
1778 self._file.truncate()
1779 self._file.write(new_ref + b"\n")
1780 self._deleted = False
1782 def set_symbolic_ref(self, target: Ref) -> None:
1783 """Make this ref point at another ref.
1785 Args:
1786 target: Name of the ref to point at
1787 """
1788 if not self._file:
1789 raise RuntimeError("locked_ref not in context")
1791 self._refs_container._check_refname(target)
1792 self._file.seek(0)
1793 self._file.truncate()
1794 self._file.write(SYMREF + target + b"\n")
1795 self._deleted = False
1797 def delete(self) -> None:
1798 """Delete the ref file while holding the lock."""
1799 if not self._file:
1800 raise RuntimeError("locked_ref not in context")
1802 # Delete the actual ref file while holding the lock
1803 if self._realname:
1804 filename = self._refs_container.refpath(self._realname)
1805 try:
1806 if os.path.lexists(filename):
1807 os.remove(filename)
1808 except FileNotFoundError:
1809 pass
1810 self._refs_container._remove_packed_ref(self._realname)
1812 self._deleted = True
1815class NamespacedRefsContainer(RefsContainer):
1816 """Wrapper that adds namespace prefix to all ref operations.
1818 This implements Git's GIT_NAMESPACE feature, which stores refs under
1819 refs/namespaces/<namespace>/ and filters operations to only show refs
1820 within that namespace.
1822 Example:
1823 With namespace "foo", a ref "refs/heads/master" is stored as
1824 "refs/namespaces/foo/refs/heads/master" in the underlying container.
1825 """
1827 def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
1828 """Initialize NamespacedRefsContainer.
1830 Args:
1831 refs: The underlying refs container to wrap
1832 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
1833 """
1834 super().__init__(logger=refs._logger)
1835 self._refs = refs
1836 # Build namespace prefix: refs/namespaces/<namespace>/
1837 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
1838 namespace_parts = namespace.split(b"/")
1839 self._namespace_prefix = b""
1840 for part in namespace_parts:
1841 self._namespace_prefix += b"refs/namespaces/" + part + b"/"
1843 def _apply_namespace(self, name: bytes) -> bytes:
1844 """Apply namespace prefix to a ref name."""
1845 # HEAD and other special refs are not namespaced
1846 if name == HEADREF or not name.startswith(b"refs/"):
1847 return name
1848 return self._namespace_prefix + name
1850 def _strip_namespace(self, name: bytes) -> bytes | None:
1851 """Remove namespace prefix from a ref name.
1853 Returns None if the ref is not in our namespace.
1854 """
1855 # HEAD and other special refs are not namespaced
1856 if name == HEADREF or not name.startswith(b"refs/"):
1857 return name
1858 if name.startswith(self._namespace_prefix):
1859 return name[len(self._namespace_prefix) :]
1860 return None
1862 def allkeys(self) -> set[Ref]:
1863 """Return all reference keys in this namespace."""
1864 keys: set[Ref] = set()
1865 for key in self._refs.allkeys():
1866 stripped = self._strip_namespace(key)
1867 if stripped is not None:
1868 keys.add(Ref(stripped))
1869 return keys
1871 def read_loose_ref(self, name: Ref) -> bytes | None:
1872 """Read a loose reference."""
1873 return self._refs.read_loose_ref(Ref(self._apply_namespace(name)))
1875 def get_packed_refs(self) -> dict[Ref, ObjectID]:
1876 """Get packed refs within this namespace."""
1877 packed: dict[Ref, ObjectID] = {}
1878 for name, value in self._refs.get_packed_refs().items():
1879 stripped = self._strip_namespace(name)
1880 if stripped is not None:
1881 packed[Ref(stripped)] = value
1882 return packed
1884 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
1885 """Add packed refs with namespace prefix."""
1886 namespaced_refs: dict[Ref, ObjectID | None] = {
1887 Ref(self._apply_namespace(name)): value for name, value in new_refs.items()
1888 }
1889 self._refs.add_packed_refs(namespaced_refs)
1891 def get_peeled(self, name: Ref) -> ObjectID | None:
1892 """Return the cached peeled value of a ref."""
1893 return self._refs.get_peeled(Ref(self._apply_namespace(name)))
1895 def set_symbolic_ref(
1896 self,
1897 name: Ref,
1898 other: Ref,
1899 committer: bytes | None = None,
1900 timestamp: int | None = None,
1901 timezone: int | None = None,
1902 message: bytes | None = None,
1903 ) -> None:
1904 """Make a ref point at another ref."""
1905 self._refs.set_symbolic_ref(
1906 Ref(self._apply_namespace(name)),
1907 Ref(self._apply_namespace(other)),
1908 committer=committer,
1909 timestamp=timestamp,
1910 timezone=timezone,
1911 message=message,
1912 )
1914 def set_if_equals(
1915 self,
1916 name: Ref,
1917 old_ref: ObjectID | None,
1918 new_ref: ObjectID,
1919 committer: bytes | None = None,
1920 timestamp: int | None = None,
1921 timezone: int | None = None,
1922 message: bytes | None = None,
1923 ) -> bool:
1924 """Set a refname to new_ref only if it currently equals old_ref."""
1925 return self._refs.set_if_equals(
1926 Ref(self._apply_namespace(name)),
1927 old_ref,
1928 new_ref,
1929 committer=committer,
1930 timestamp=timestamp,
1931 timezone=timezone,
1932 message=message,
1933 )
1935 def add_if_new(
1936 self,
1937 name: Ref,
1938 ref: ObjectID,
1939 committer: bytes | None = None,
1940 timestamp: int | None = None,
1941 timezone: int | None = None,
1942 message: bytes | None = None,
1943 ) -> bool:
1944 """Add a new reference only if it does not already exist."""
1945 return self._refs.add_if_new(
1946 Ref(self._apply_namespace(name)),
1947 ref,
1948 committer=committer,
1949 timestamp=timestamp,
1950 timezone=timezone,
1951 message=message,
1952 )
1954 def remove_if_equals(
1955 self,
1956 name: Ref,
1957 old_ref: ObjectID | None,
1958 committer: bytes | None = None,
1959 timestamp: int | None = None,
1960 timezone: int | None = None,
1961 message: bytes | None = None,
1962 ) -> bool:
1963 """Remove a refname only if it currently equals old_ref."""
1964 return self._refs.remove_if_equals(
1965 Ref(self._apply_namespace(name)),
1966 old_ref,
1967 committer=committer,
1968 timestamp=timestamp,
1969 timezone=timezone,
1970 message=message,
1971 )
1973 def pack_refs(self, all: bool = False) -> None:
1974 """Pack loose refs into packed-refs file.
1976 Note: This packs all refs in the underlying container, not just
1977 those in the namespace.
1978 """
1979 self._refs.pack_refs(all=all)
1982def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
1983 """Filter refs to only include those with a given prefix.
1985 Args:
1986 refs: A dictionary of refs.
1987 prefixes: The prefixes to filter by.
1988 """
1989 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
1990 return filtered
1993def is_per_worktree_ref(ref: bytes) -> bool:
1994 """Returns whether a reference is stored per worktree or not.
1996 Per-worktree references are:
1997 - all pseudorefs, e.g. HEAD
1998 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
2000 All refs starting with "refs/" are shared, except for the ones listed above.
2002 See https://git-scm.com/docs/git-worktree#_refs.
2003 """
2004 return not ref.startswith(b"refs/") or ref.startswith(
2005 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
2006 )