Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25__all__ = [
26 "HEADREF",
27 "LOCAL_BRANCH_PREFIX",
28 "LOCAL_NOTES_PREFIX",
29 "LOCAL_REMOTE_PREFIX",
30 "LOCAL_REPLACE_PREFIX",
31 "LOCAL_TAG_PREFIX",
32 "SYMREF",
33 "DictRefsContainer",
34 "DiskRefsContainer",
35 "NamespacedRefsContainer",
36 "Ref",
37 "RefsContainer",
38 "SymrefLoop",
39 "check_ref_format",
40 "extract_branch_name",
41 "extract_tag_name",
42 "filter_ref_prefix",
43 "is_local_branch",
44 "is_per_worktree_ref",
45 "local_branch_name",
46 "local_replace_name",
47 "local_tag_name",
48 "parse_remote_ref",
49 "parse_symref_value",
50 "read_info_refs",
51 "read_packed_refs",
52 "read_packed_refs_with_peeled",
53 "set_ref_from_raw",
54 "shorten_ref_name",
55 "write_packed_refs",
56]
58import os
59import sys
60import types
61import warnings
62from collections.abc import Callable, Iterable, Iterator, Mapping
63from contextlib import suppress
64from typing import (
65 IO,
66 TYPE_CHECKING,
67 Any,
68 BinaryIO,
69 NewType,
70 TypeVar,
71)
73if sys.version_info >= (3, 11):
74 from typing import Self
75else:
76 from typing_extensions import Self
78if TYPE_CHECKING:
79 from .file import _GitFile
81from .errors import PackedRefsException, RefFormatError
82from .file import GitFile, ensure_dir_exists
83from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
85Ref = NewType("Ref", bytes)
87T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None])
89HEADREF = Ref(b"HEAD")
90SYMREF = b"ref: "
91LOCAL_BRANCH_PREFIX = b"refs/heads/"
92LOCAL_TAG_PREFIX = b"refs/tags/"
93LOCAL_REMOTE_PREFIX = b"refs/remotes/"
94LOCAL_NOTES_PREFIX = b"refs/notes/"
95LOCAL_REPLACE_PREFIX = b"refs/replace/"
96BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[")
99class SymrefLoop(Exception):
100 """There is a loop between one or more symrefs."""
102 def __init__(self, ref: bytes, depth: int) -> None:
103 """Initialize SymrefLoop exception."""
104 self.ref = ref
105 self.depth = depth
108def parse_symref_value(contents: bytes) -> bytes:
109 """Parse a symref value.
111 Args:
112 contents: Contents to parse
113 Returns: Destination
114 """
115 if contents.startswith(SYMREF):
116 return contents[len(SYMREF) :].rstrip(b"\r\n")
117 raise ValueError(contents)
120def check_ref_format(refname: Ref) -> bool:
121 """Check if a refname is correctly formatted.
123 Implements all the same rules as git-check-ref-format[1].
125 [1]
126 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
128 Args:
129 refname: The refname to check
130 Returns: True if refname is valid, False otherwise
131 """
132 # These could be combined into one big expression, but are listed
133 # separately to parallel [1].
134 if refname == b"@":
135 return False
136 if b"/" not in refname: # type: ignore[comparison-overlap]
137 return False
138 if b".." in refname: # type: ignore[comparison-overlap]
139 return False
140 for i, c in enumerate(refname):
141 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
142 return False
143 if refname[-1] in b"/.":
144 return False
145 if b"@{" in refname: # type: ignore[comparison-overlap]
146 return False
147 if b"\\" in refname: # type: ignore[comparison-overlap]
148 return False
149 for component in refname.split(b"/"):
150 if not component:
151 return False
152 if component.startswith(b"."):
153 return False
154 if component.endswith(b".lock"):
155 return False
156 return True
159def _collapse_slashes(refname: bytes) -> bytes:
160 """Collapse runs of consecutive slashes in a ref name into a single slash."""
161 return b"/".join(component for component in refname.split(b"/") if component)
164def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
165 """Parse a remote ref into remote name and branch name.
167 Args:
168 ref: Remote ref like b"refs/remotes/origin/main"
170 Returns:
171 Tuple of (remote_name, branch_name)
173 Raises:
174 ValueError: If ref is not a valid remote ref
175 """
176 if not ref.startswith(LOCAL_REMOTE_PREFIX):
177 raise ValueError(f"Not a remote ref: {ref!r}")
179 # Remove the prefix
180 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
182 # Split into remote name and branch name
183 parts = remainder.split(b"/", 1)
184 if len(parts) != 2:
185 raise ValueError(f"Invalid remote ref format: {ref!r}")
187 remote_name, branch_name = parts
188 return (remote_name, branch_name)
191def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None:
192 """Set a reference from a raw ref value.
194 This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs.
196 Args:
197 refs: The RefsContainer to set the ref in
198 name: The ref name to set
199 raw_ref: The raw ref value (either a symbolic ref or an ObjectID)
200 """
201 if raw_ref.startswith(SYMREF):
202 # It's a symbolic ref
203 target = Ref(raw_ref[len(SYMREF) :])
204 refs.set_symbolic_ref(name, target)
205 else:
206 # It's a direct ObjectID
207 refs[name] = ObjectID(raw_ref)
210class RefsContainer:
211 """A container for refs."""
213 def __init__(
214 self,
215 logger: Callable[
216 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
217 ]
218 | None = None,
219 ) -> None:
220 """Initialize RefsContainer with optional logger function."""
221 self._logger = logger
223 def _log(
224 self,
225 ref: bytes,
226 old_sha: bytes | None,
227 new_sha: bytes | None,
228 committer: bytes | None = None,
229 timestamp: int | None = None,
230 timezone: int | None = None,
231 message: bytes | None = None,
232 ) -> None:
233 if self._logger is None:
234 return
235 if message is None:
236 return
237 # Use ZERO_SHA for None values, matching git behavior
238 if old_sha is None:
239 old_sha = ZERO_SHA
240 if new_sha is None:
241 new_sha = ZERO_SHA
242 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
244 def set_symbolic_ref(
245 self,
246 name: Ref,
247 other: Ref,
248 committer: bytes | None = None,
249 timestamp: int | None = None,
250 timezone: int | None = None,
251 message: bytes | None = None,
252 ) -> None:
253 """Make a ref point at another ref.
255 Args:
256 name: Name of the ref to set
257 other: Name of the ref to point at
258 committer: Optional committer name/email
259 timestamp: Optional timestamp
260 timezone: Optional timezone
261 message: Optional message
262 """
263 raise NotImplementedError(self.set_symbolic_ref)
265 def get_packed_refs(self) -> dict[Ref, ObjectID]:
266 """Get contents of the packed-refs file.
268 Returns: Dictionary mapping ref names to SHA1s
270 Note: Will return an empty dictionary when no packed-refs file is
271 present.
272 """
273 raise NotImplementedError(self.get_packed_refs)
275 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
276 """Add the given refs as packed refs.
278 Args:
279 new_refs: A mapping of ref names to targets; if a target is None that
280 means remove the ref
281 """
282 raise NotImplementedError(self.add_packed_refs)
284 def get_peeled(self, name: Ref) -> ObjectID | None:
285 """Return the cached peeled value of a ref, if available.
287 Args:
288 name: Name of the ref to peel
289 Returns: The peeled value of the ref. If the ref is known not point to
290 a tag, this will be the SHA the ref refers to. If the ref may point
291 to a tag, but no cached information is available, None is returned.
292 """
293 return None
295 def import_refs(
296 self,
297 base: Ref,
298 other: Mapping[Ref, ObjectID | None],
299 committer: bytes | None = None,
300 timestamp: bytes | None = None,
301 timezone: bytes | None = None,
302 message: bytes | None = None,
303 prune: bool = False,
304 ) -> None:
305 """Import refs from another repository.
307 Args:
308 base: Base ref to import into (e.g., b'refs/remotes/origin')
309 other: Dictionary of refs to import
310 committer: Optional committer for reflog
311 timestamp: Optional timestamp for reflog
312 timezone: Optional timezone for reflog
313 message: Optional message for reflog
314 prune: If True, remove refs not in other
315 """
316 # Strip a trailing slash so joining ``base`` with a ref name below
317 # does not produce a malformed ref with an empty path component
318 # (e.g. b'refs/tags/' + b'/' + b'v1.0' -> b'refs/tags//v1.0').
319 base = Ref(base.rstrip(b"/"))
320 if prune:
321 to_delete = set(self.subkeys(base))
322 else:
323 to_delete = set()
324 for name, value in other.items():
325 if value is None:
326 to_delete.add(name)
327 else:
328 self.set_if_equals(
329 Ref(b"/".join((base, name))), None, value, message=message
330 )
331 if to_delete:
332 try:
333 to_delete.remove(name)
334 except KeyError:
335 pass
336 for ref in to_delete:
337 self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message)
339 def allkeys(self) -> set[Ref]:
340 """All refs present in this container."""
341 raise NotImplementedError(self.allkeys)
343 def __iter__(self) -> Iterator[Ref]:
344 """Iterate over all reference keys."""
345 return iter(self.allkeys())
347 def keys(self, base: Ref | None = None) -> set[Ref]:
348 """Refs present in this container.
350 Args:
351 base: An optional base to return refs under.
352 Returns: An unsorted set of valid refs in this container, including
353 packed refs.
354 """
355 if base is not None:
356 return self.subkeys(base)
357 else:
358 return self.allkeys()
360 def subkeys(self, base: Ref) -> set[Ref]:
361 """Refs present in this container under a base.
363 Args:
364 base: The base to return refs under.
365 Returns: A set of valid refs in this container under the base; the base
366 prefix is stripped from the ref names returned.
367 """
368 keys: set[Ref] = set()
369 base_len = len(base) + 1
370 for refname in self.allkeys():
371 if refname.startswith(base):
372 keys.add(Ref(refname[base_len:]))
373 return keys
375 def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]:
376 """Return the contents of this container as a dictionary."""
377 ret: dict[Ref, ObjectID] = {}
378 keys = self.keys(base)
379 base_bytes: bytes
380 if base is None:
381 base_bytes = b""
382 else:
383 base_bytes = base.rstrip(b"/")
384 for key in keys:
385 try:
386 ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))]
387 except (SymrefLoop, KeyError):
388 continue # Unable to resolve
390 return ret
392 def _check_refname(self, name: Ref) -> None:
393 """Ensure a refname is valid and lives in refs or is HEAD.
395 HEAD is not a valid refname according to git-check-ref-format, but this
396 class needs to be able to touch HEAD. Also, check_ref_format expects
397 refnames without the leading 'refs/', but this class requires that
398 so it cannot touch anything outside the refs dir (or HEAD).
400 Args:
401 name: The name of the reference.
403 Raises:
404 KeyError: if a refname is not HEAD or is otherwise not valid.
405 """
406 if name in (HEADREF, Ref(b"refs/stash")):
407 return
408 if not name.startswith(b"refs/"):
409 raise RefFormatError(name)
410 rest = Ref(name[5:])
411 if check_ref_format(rest):
412 return
413 # As of Dulwich 1.2.3 check_ref_format rejects empty path components
414 # (e.g. b'refs/tags//v1.0'). Such names were silently accepted before,
415 # and some callers (e.g. older Poetry releases) still construct them.
416 # Warn rather than raise for now if collapsing repeated slashes would
417 # make the name valid, so the only defect is empty components.
418 if (
419 b"//" in name # type: ignore[comparison-overlap]
420 and check_ref_format(Ref(_collapse_slashes(rest)))
421 ):
422 warnings.warn(
423 f"Ref name {name!r} contains empty path components; "
424 "this will be rejected in a future version of Dulwich.",
425 DeprecationWarning,
426 stacklevel=3,
427 )
428 return
429 raise RefFormatError(name)
431 def read_ref(self, refname: Ref) -> bytes | None:
432 """Read a reference without following any references.
434 Args:
435 refname: The name of the reference
436 Returns: The contents of the ref file, or None if it does
437 not exist.
438 """
439 contents = self.read_loose_ref(refname)
440 if not contents:
441 contents = self.get_packed_refs().get(refname, None)
442 return contents
444 def read_loose_ref(self, name: Ref) -> bytes | None:
445 """Read a loose reference and return its contents.
447 Args:
448 name: the refname to read
449 Returns: The contents of the ref file, or None if it does
450 not exist.
451 """
452 raise NotImplementedError(self.read_loose_ref)
454 def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]:
455 """Follow a reference name.
457 Returns: a tuple of (refnames, sha), wheres refnames are the names of
458 references in the chain
459 """
460 contents: bytes | None = SYMREF + name
461 depth = 0
462 refnames: list[Ref] = []
463 while contents and contents.startswith(SYMREF):
464 refname = Ref(contents[len(SYMREF) :])
465 refnames.append(refname)
466 contents = self.read_ref(refname)
467 if not contents:
468 break
469 depth += 1
470 if depth > 5:
471 raise SymrefLoop(name, depth)
472 return refnames, ObjectID(contents) if contents else None
474 def __contains__(self, refname: Ref) -> bool:
475 """Check if a reference exists."""
476 if self.read_ref(refname):
477 return True
478 return False
480 def __getitem__(self, name: Ref) -> ObjectID:
481 """Get the SHA1 for a reference name.
483 This method follows all symbolic references.
484 """
485 _, sha = self.follow(name)
486 if sha is None:
487 raise KeyError(name)
488 return sha
490 def set_if_equals(
491 self,
492 name: Ref,
493 old_ref: ObjectID | None,
494 new_ref: ObjectID,
495 committer: bytes | None = None,
496 timestamp: int | None = None,
497 timezone: int | None = None,
498 message: bytes | None = None,
499 ) -> bool:
500 """Set a refname to new_ref only if it currently equals old_ref.
502 This method follows all symbolic references if applicable for the
503 subclass, and can be used to perform an atomic compare-and-swap
504 operation.
506 Args:
507 name: The refname to set.
508 old_ref: The old sha the refname must refer to, or None to set
509 unconditionally.
510 new_ref: The new sha the refname will refer to.
511 committer: Optional committer name/email
512 timestamp: Optional timestamp
513 timezone: Optional timezone
514 message: Message for reflog
515 Returns: True if the set was successful, False otherwise.
516 """
517 raise NotImplementedError(self.set_if_equals)
519 def add_if_new(
520 self,
521 name: Ref,
522 ref: ObjectID,
523 committer: bytes | None = None,
524 timestamp: int | None = None,
525 timezone: int | None = None,
526 message: bytes | None = None,
527 ) -> bool:
528 """Add a new reference only if it does not already exist.
530 Args:
531 name: Ref name
532 ref: Ref value
533 committer: Optional committer name/email
534 timestamp: Optional timestamp
535 timezone: Optional timezone
536 message: Optional message for reflog
537 """
538 raise NotImplementedError(self.add_if_new)
540 def __setitem__(self, name: Ref, ref: ObjectID) -> None:
541 """Set a reference name to point to the given SHA1.
543 This method follows all symbolic references if applicable for the
544 subclass.
546 Note: This method unconditionally overwrites the contents of a
547 reference. To update atomically only if the reference has not
548 changed, use set_if_equals().
550 Args:
551 name: The refname to set.
552 ref: The new sha the refname will refer to.
553 """
554 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
555 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
556 self.set_if_equals(name, None, ref)
558 def remove_if_equals(
559 self,
560 name: Ref,
561 old_ref: ObjectID | None,
562 committer: bytes | None = None,
563 timestamp: int | None = None,
564 timezone: int | None = None,
565 message: bytes | None = None,
566 ) -> bool:
567 """Remove a refname only if it currently equals old_ref.
569 This method does not follow symbolic references, even if applicable for
570 the subclass. It can be used to perform an atomic compare-and-delete
571 operation.
573 Args:
574 name: The refname to delete.
575 old_ref: The old sha the refname must refer to, or None to
576 delete unconditionally.
577 committer: Optional committer name/email
578 timestamp: Optional timestamp
579 timezone: Optional timezone
580 message: Message for reflog
581 Returns: True if the delete was successful, False otherwise.
582 """
583 raise NotImplementedError(self.remove_if_equals)
585 def __delitem__(self, name: Ref) -> None:
586 """Remove a refname.
588 This method does not follow symbolic references, even if applicable for
589 the subclass.
591 Note: This method unconditionally deletes the contents of a reference.
592 To delete atomically only if the reference has not changed, use
593 remove_if_equals().
595 Args:
596 name: The refname to delete.
597 """
598 self.remove_if_equals(name, None)
600 def get_symrefs(self) -> dict[Ref, Ref]:
601 """Get a dict with all symrefs in this container.
603 Returns: Dictionary mapping source ref to target ref
604 """
605 ret: dict[Ref, Ref] = {}
606 for src in self.allkeys():
607 try:
608 ref_value = self.read_ref(src)
609 assert ref_value is not None
610 dst = parse_symref_value(ref_value)
611 except ValueError:
612 pass
613 else:
614 ret[src] = Ref(dst)
615 return ret
617 def pack_refs(self, all: bool = False) -> None:
618 """Pack loose refs into packed-refs file.
620 Args:
621 all: If True, pack all refs. If False, only pack tags.
622 """
623 raise NotImplementedError(self.pack_refs)
626class DictRefsContainer(RefsContainer):
627 """RefsContainer backed by a simple dict.
629 This container does not support symbolic or packed references and is not
630 threadsafe.
631 """
633 def __init__(
634 self,
635 refs: dict[Ref, bytes],
636 logger: Callable[
637 [
638 bytes,
639 bytes | None,
640 bytes | None,
641 bytes | None,
642 int | None,
643 int | None,
644 bytes | None,
645 ],
646 None,
647 ]
648 | None = None,
649 ) -> None:
650 """Initialize DictRefsContainer with refs dictionary and optional logger."""
651 super().__init__(logger=logger)
652 self._refs = refs
653 self._peeled: dict[Ref, ObjectID] = {}
654 self._watchers: set[Any] = set()
656 def allkeys(self) -> set[Ref]:
657 """Return all reference keys."""
658 return set(self._refs.keys())
660 def read_loose_ref(self, name: Ref) -> bytes | None:
661 """Read a loose reference."""
662 return self._refs.get(name, None)
664 def get_packed_refs(self) -> dict[Ref, ObjectID]:
665 """Get packed references."""
666 return {}
668 def _notify(self, ref: bytes, newsha: bytes | None) -> None:
669 for watcher in self._watchers:
670 watcher._notify((ref, newsha))
672 def set_symbolic_ref(
673 self,
674 name: Ref,
675 other: Ref,
676 committer: bytes | None = None,
677 timestamp: int | None = None,
678 timezone: int | None = None,
679 message: bytes | None = None,
680 ) -> None:
681 """Make a ref point at another ref.
683 Args:
684 name: Name of the ref to set
685 other: Name of the ref to point at
686 committer: Optional committer name for reflog
687 timestamp: Optional timestamp for reflog
688 timezone: Optional timezone for reflog
689 message: Optional message for reflog
690 """
691 old = self.follow(name)[-1]
692 new = SYMREF + other
693 self._refs[name] = new
694 self._notify(name, new)
695 self._log(
696 name,
697 old,
698 new,
699 committer=committer,
700 timestamp=timestamp,
701 timezone=timezone,
702 message=message,
703 )
705 def set_if_equals(
706 self,
707 name: Ref,
708 old_ref: ObjectID | None,
709 new_ref: ObjectID,
710 committer: bytes | None = None,
711 timestamp: int | None = None,
712 timezone: int | None = None,
713 message: bytes | None = None,
714 ) -> bool:
715 """Set a refname to new_ref only if it currently equals old_ref.
717 This method follows all symbolic references, and can be used to perform
718 an atomic compare-and-swap operation.
720 Args:
721 name: The refname to set.
722 old_ref: The old sha the refname must refer to, or None to set
723 unconditionally.
724 new_ref: The new sha the refname will refer to.
725 committer: Optional committer name for reflog
726 timestamp: Optional timestamp for reflog
727 timezone: Optional timezone for reflog
728 message: Optional message for reflog
730 Returns:
731 True if the set was successful, False otherwise.
732 """
733 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
734 return False
735 # Only update the specific ref requested, not the whole chain
736 self._check_refname(name)
737 old = self._refs.get(name)
738 self._refs[name] = new_ref
739 self._notify(name, new_ref)
740 self._log(
741 name,
742 old,
743 new_ref,
744 committer=committer,
745 timestamp=timestamp,
746 timezone=timezone,
747 message=message,
748 )
749 return True
751 def add_if_new(
752 self,
753 name: Ref,
754 ref: ObjectID,
755 committer: bytes | None = None,
756 timestamp: int | None = None,
757 timezone: int | None = None,
758 message: bytes | None = None,
759 ) -> bool:
760 """Add a new reference only if it does not already exist.
762 Args:
763 name: Ref name
764 ref: Ref value
765 committer: Optional committer name for reflog
766 timestamp: Optional timestamp for reflog
767 timezone: Optional timezone for reflog
768 message: Optional message for reflog
770 Returns:
771 True if the add was successful, False otherwise.
772 """
773 if name in self._refs:
774 return False
775 self._refs[name] = ref
776 self._notify(name, ref)
777 self._log(
778 name,
779 None,
780 ref,
781 committer=committer,
782 timestamp=timestamp,
783 timezone=timezone,
784 message=message,
785 )
786 return True
788 def remove_if_equals(
789 self,
790 name: Ref,
791 old_ref: ObjectID | None,
792 committer: bytes | None = None,
793 timestamp: int | None = None,
794 timezone: int | None = None,
795 message: bytes | None = None,
796 ) -> bool:
797 """Remove a refname only if it currently equals old_ref.
799 This method does not follow symbolic references. It can be used to
800 perform an atomic compare-and-delete operation.
802 Args:
803 name: The refname to delete.
804 old_ref: The old sha the refname must refer to, or None to
805 delete unconditionally.
806 committer: Optional committer name for reflog
807 timestamp: Optional timestamp for reflog
808 timezone: Optional timezone for reflog
809 message: Optional message for reflog
811 Returns:
812 True if the delete was successful, False otherwise.
813 """
814 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
815 return False
816 try:
817 old = self._refs.pop(name)
818 except KeyError:
819 pass
820 else:
821 self._notify(name, None)
822 self._log(
823 name,
824 old,
825 None,
826 committer=committer,
827 timestamp=timestamp,
828 timezone=timezone,
829 message=message,
830 )
831 return True
833 def get_peeled(self, name: Ref) -> ObjectID | None:
834 """Get peeled version of a reference."""
835 return self._peeled.get(name)
837 def _update(self, refs: Mapping[Ref, ObjectID]) -> None:
838 """Update multiple refs; intended only for testing."""
839 # TODO(dborowitz): replace this with a public function that uses
840 # set_if_equal.
841 for ref, sha in refs.items():
842 self.set_if_equals(ref, None, sha)
844 def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None:
845 """Update cached peeled refs; intended only for testing."""
846 self._peeled.update(peeled)
849class DiskRefsContainer(RefsContainer):
850 """Refs container that reads refs from disk."""
852 def __init__(
853 self,
854 path: str | bytes | os.PathLike[str],
855 worktree_path: str | bytes | os.PathLike[str] | None = None,
856 logger: Callable[
857 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
858 ]
859 | None = None,
860 ) -> None:
861 """Initialize DiskRefsContainer."""
862 super().__init__(logger=logger)
863 # Convert path-like objects to strings, then to bytes for Git compatibility
864 self.path = os.fsencode(os.fspath(path))
865 if worktree_path is None:
866 self.worktree_path = self.path
867 else:
868 self.worktree_path = os.fsencode(os.fspath(worktree_path))
869 self._packed_refs: dict[Ref, ObjectID] | None = None
870 self._peeled_refs: dict[Ref, ObjectID] | None = None
872 def __repr__(self) -> str:
873 """Return string representation of DiskRefsContainer."""
874 return f"{self.__class__.__name__}({self.path!r})"
876 def _iter_dir(
877 self,
878 path: bytes,
879 base: bytes,
880 dir_filter: Callable[[bytes], bool] | None = None,
881 ) -> Iterator[Ref]:
882 refspath = os.path.join(path, base.rstrip(b"/"))
883 prefix_len = len(os.path.join(path, b""))
885 for root, dirs, files in os.walk(refspath):
886 directory = root[prefix_len:]
887 if os.path.sep != "/":
888 directory = directory.replace(os.fsencode(os.path.sep), b"/")
889 if dir_filter is not None:
890 dirs[:] = [
891 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
892 ]
894 for filename in files:
895 refname = b"/".join([directory, filename])
896 if check_ref_format(Ref(refname)):
897 yield Ref(refname)
899 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]:
900 base = base.rstrip(b"/") + b"/"
901 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = []
902 if base != b"refs/":
903 path = self.worktree_path if is_per_worktree_ref(base) else self.path
904 search_paths.append((path, None))
905 elif self.worktree_path == self.path:
906 # Iterate through all the refs from the main worktree
907 search_paths.append((self.path, None))
908 else:
909 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
910 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
911 # Iterate through all the per-worktree refs from the worktree's gitdir
912 search_paths.append((self.worktree_path, is_per_worktree_ref))
914 for path, dir_filter in search_paths:
915 yield from self._iter_dir(path, base, dir_filter=dir_filter)
917 def subkeys(self, base: Ref) -> set[Ref]:
918 """Return subkeys under a given base reference path."""
919 subkeys: set[Ref] = set()
921 for key in self._iter_loose_refs(base):
922 if key.startswith(base):
923 subkeys.add(Ref(key[len(base) :].strip(b"/")))
925 for key in self.get_packed_refs():
926 if key.startswith(base):
927 subkeys.add(Ref(key[len(base) :].strip(b"/")))
928 return subkeys
930 def allkeys(self) -> set[Ref]:
931 """Return all reference keys."""
932 allkeys: set[Ref] = set()
933 if os.path.exists(self.refpath(HEADREF)):
934 allkeys.add(Ref(HEADREF))
936 allkeys.update(self._iter_loose_refs())
937 allkeys.update(self.get_packed_refs())
938 return allkeys
940 def refpath(self, name: bytes) -> bytes:
941 """Return the disk path of a ref."""
942 path = name
943 if os.path.sep != "/":
944 path = path.replace(b"/", os.fsencode(os.path.sep))
946 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
947 return os.path.join(root_dir, path)
949 def get_packed_refs(self) -> dict[Ref, ObjectID]:
950 """Get contents of the packed-refs file.
952 Returns: Dictionary mapping ref names to SHA1s
954 Note: Will return an empty dictionary when no packed-refs file is
955 present.
956 """
957 # TODO: invalidate the cache on repacking
958 if self._packed_refs is None:
959 # set both to empty because we want _peeled_refs to be
960 # None if and only if _packed_refs is also None.
961 self._packed_refs = {}
962 self._peeled_refs = {}
963 path = os.path.join(self.path, b"packed-refs")
964 try:
965 f = GitFile(path, "rb")
966 except FileNotFoundError:
967 return {}
968 with f:
969 first_line = next(iter(f)).rstrip()
970 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
971 for sha, name, peeled in read_packed_refs_with_peeled(f):
972 self._packed_refs[name] = sha
973 if peeled:
974 self._peeled_refs[name] = peeled
975 else:
976 f.seek(0)
977 for sha, name in read_packed_refs(f):
978 self._packed_refs[name] = sha
979 return self._packed_refs
981 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
982 """Add the given refs as packed refs.
984 Args:
985 new_refs: A mapping of ref names to targets; if a target is None that
986 means remove the ref
987 """
988 if not new_refs:
989 return
991 path = os.path.join(self.path, b"packed-refs")
993 with GitFile(path, "wb") as f:
994 # reread cached refs from disk, while holding the lock
995 packed_refs = self.get_packed_refs().copy()
997 for ref, target in new_refs.items():
998 # sanity check
999 if ref == HEADREF:
1000 raise ValueError("cannot pack HEAD")
1002 # remove any loose refs pointing to this one -- please
1003 # note that this bypasses remove_if_equals as we don't
1004 # want to affect packed refs in here
1005 with suppress(OSError):
1006 os.remove(self.refpath(ref))
1008 if target is not None:
1009 packed_refs[ref] = target
1010 else:
1011 packed_refs.pop(ref, None)
1013 write_packed_refs(f, packed_refs, self._peeled_refs)
1015 self._packed_refs = packed_refs
1017 def get_peeled(self, name: Ref) -> ObjectID | None:
1018 """Return the cached peeled value of a ref, if available.
1020 Args:
1021 name: Name of the ref to peel
1022 Returns: The peeled value of the ref. If the ref is known not point to
1023 a tag, this will be the SHA the ref refers to. If the ref may point
1024 to a tag, but no cached information is available, None is returned.
1025 """
1026 self.get_packed_refs()
1027 if (
1028 self._peeled_refs is None
1029 or self._packed_refs is None
1030 or name not in self._packed_refs
1031 ):
1032 # No cache: no peeled refs were read, or this ref is loose
1033 return None
1034 if name in self._peeled_refs:
1035 return self._peeled_refs[name]
1036 else:
1037 # Known not peelable
1038 return self[name]
1040 def read_loose_ref(self, name: Ref) -> bytes | None:
1041 """Read a reference file and return its contents.
1043 If the reference file a symbolic reference, only read the first line of
1044 the file. Otherwise, read the hash (40 bytes for SHA1, 64 bytes for SHA256).
1046 Args:
1047 name: the refname to read, relative to refpath
1048 Returns: The contents of the ref file, or None if the file does not
1049 exist.
1051 Raises:
1052 IOError: if any other error occurs
1053 """
1054 filename = self.refpath(name)
1055 try:
1056 with GitFile(filename, "rb") as f:
1057 header = f.read(len(SYMREF))
1058 if header == SYMREF:
1059 # Read only the first line
1060 return header + next(iter(f)).rstrip(b"\r\n")
1061 else:
1062 # Read the entire line to get the full hash (handles both SHA1 and SHA256)
1063 f.seek(0)
1064 line = f.readline().rstrip(b"\r\n")
1065 return line
1066 except (OSError, UnicodeError):
1067 # don't assume anything specific about the error; in
1068 # particular, invalid or forbidden paths can raise weird
1069 # errors depending on the specific operating system
1070 return None
1072 def _remove_packed_ref(self, name: Ref) -> None:
1073 if self._packed_refs is None:
1074 return
1075 filename = os.path.join(self.path, b"packed-refs")
1076 # reread cached refs from disk, while holding the lock
1077 f = GitFile(filename, "wb")
1078 try:
1079 self._packed_refs = None
1080 self.get_packed_refs()
1082 if self._packed_refs is None or name not in self._packed_refs:
1083 f.abort()
1084 return
1086 del self._packed_refs[name]
1087 if self._peeled_refs is not None:
1088 with suppress(KeyError):
1089 del self._peeled_refs[name]
1090 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1091 f.close()
1092 except BaseException:
1093 f.abort()
1094 raise
1096 def set_symbolic_ref(
1097 self,
1098 name: Ref,
1099 other: Ref,
1100 committer: bytes | None = None,
1101 timestamp: int | None = None,
1102 timezone: int | None = None,
1103 message: bytes | None = None,
1104 ) -> None:
1105 """Make a ref point at another ref.
1107 Args:
1108 name: Name of the ref to set
1109 other: Name of the ref to point at
1110 committer: Optional committer name
1111 timestamp: Optional timestamp
1112 timezone: Optional timezone
1113 message: Optional message to describe the change
1114 """
1115 self._check_refname(name)
1116 self._check_refname(other)
1117 filename = self.refpath(name)
1118 f = GitFile(filename, "wb")
1119 try:
1120 f.write(SYMREF + other + b"\n")
1121 sha = self.follow(name)[-1]
1122 self._log(
1123 name,
1124 sha,
1125 sha,
1126 committer=committer,
1127 timestamp=timestamp,
1128 timezone=timezone,
1129 message=message,
1130 )
1131 except BaseException:
1132 f.abort()
1133 raise
1134 else:
1135 f.close()
1137 def set_if_equals(
1138 self,
1139 name: Ref,
1140 old_ref: ObjectID | None,
1141 new_ref: ObjectID,
1142 committer: bytes | None = None,
1143 timestamp: int | None = None,
1144 timezone: int | None = None,
1145 message: bytes | None = None,
1146 ) -> bool:
1147 """Set a refname to new_ref only if it currently equals old_ref.
1149 This method follows all symbolic references, and can be used to perform
1150 an atomic compare-and-swap operation.
1152 Args:
1153 name: The refname to set.
1154 old_ref: The old sha the refname must refer to, or None to set
1155 unconditionally.
1156 new_ref: The new sha the refname will refer to.
1157 committer: Optional committer name
1158 timestamp: Optional timestamp
1159 timezone: Optional timezone
1160 message: Set message for reflog
1161 Returns: True if the set was successful, False otherwise.
1162 """
1163 self._check_refname(name)
1164 try:
1165 realnames, _ = self.follow(name)
1166 realname = realnames[-1]
1167 except (KeyError, IndexError, SymrefLoop):
1168 realname = name
1169 filename = self.refpath(realname)
1171 # make sure none of the ancestor folders is in packed refs
1172 probe_ref = Ref(os.path.dirname(realname))
1173 packed_refs = self.get_packed_refs()
1174 while probe_ref:
1175 if packed_refs.get(probe_ref, None) is not None:
1176 raise NotADirectoryError(filename)
1177 probe_ref = Ref(os.path.dirname(probe_ref))
1179 ensure_dir_exists(os.path.dirname(filename))
1180 with GitFile(filename, "wb") as f:
1181 if old_ref is not None:
1182 try:
1183 # read again while holding the lock to handle race conditions
1184 orig_ref = self.read_loose_ref(realname)
1185 if orig_ref is None:
1186 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1187 if orig_ref != old_ref:
1188 f.abort()
1189 return False
1190 except OSError:
1191 f.abort()
1192 raise
1194 # Check if ref already has the desired value while holding the lock
1195 # This avoids fsync when ref is unchanged but still detects lock conflicts
1196 current_ref = self.read_loose_ref(realname)
1197 if current_ref is None:
1198 current_ref = packed_refs.get(realname, None)
1200 if current_ref is not None and current_ref == new_ref:
1201 # Ref already has desired value, abort write to avoid fsync
1202 f.abort()
1203 return True
1205 try:
1206 f.write(new_ref + b"\n")
1207 except OSError:
1208 f.abort()
1209 raise
1210 self._log(
1211 realname,
1212 old_ref,
1213 new_ref,
1214 committer=committer,
1215 timestamp=timestamp,
1216 timezone=timezone,
1217 message=message,
1218 )
1219 return True
1221 def add_if_new(
1222 self,
1223 name: Ref,
1224 ref: ObjectID,
1225 committer: bytes | None = None,
1226 timestamp: int | None = None,
1227 timezone: int | None = None,
1228 message: bytes | None = None,
1229 ) -> bool:
1230 """Add a new reference only if it does not already exist.
1232 This method follows symrefs, and only ensures that the last ref in the
1233 chain does not exist.
1235 Args:
1236 name: The refname to set.
1237 ref: The new sha the refname will refer to.
1238 committer: Optional committer name
1239 timestamp: Optional timestamp
1240 timezone: Optional timezone
1241 message: Optional message for reflog
1242 Returns: True if the add was successful, False otherwise.
1243 """
1244 try:
1245 realnames, contents = self.follow(name)
1246 if contents is not None:
1247 return False
1248 realname = realnames[-1]
1249 except (KeyError, IndexError):
1250 realname = name
1251 self._check_refname(realname)
1252 filename = self.refpath(realname)
1253 ensure_dir_exists(os.path.dirname(filename))
1254 with GitFile(filename, "wb") as f:
1255 if os.path.exists(filename) or name in self.get_packed_refs():
1256 f.abort()
1257 return False
1258 try:
1259 f.write(ref + b"\n")
1260 except OSError:
1261 f.abort()
1262 raise
1263 else:
1264 self._log(
1265 name,
1266 None,
1267 ref,
1268 committer=committer,
1269 timestamp=timestamp,
1270 timezone=timezone,
1271 message=message,
1272 )
1273 return True
1275 def remove_if_equals(
1276 self,
1277 name: Ref,
1278 old_ref: ObjectID | None,
1279 committer: bytes | None = None,
1280 timestamp: int | None = None,
1281 timezone: int | None = None,
1282 message: bytes | None = None,
1283 ) -> bool:
1284 """Remove a refname only if it currently equals old_ref.
1286 This method does not follow symbolic references. It can be used to
1287 perform an atomic compare-and-delete operation.
1289 Args:
1290 name: The refname to delete.
1291 old_ref: The old sha the refname must refer to, or None to
1292 delete unconditionally.
1293 committer: Optional committer name
1294 timestamp: Optional timestamp
1295 timezone: Optional timezone
1296 message: Optional message
1297 Returns: True if the delete was successful, False otherwise.
1298 """
1299 self._check_refname(name)
1300 filename = self.refpath(name)
1301 ensure_dir_exists(os.path.dirname(filename))
1302 f = GitFile(filename, "wb")
1303 try:
1304 if old_ref is not None:
1305 orig_ref = self.read_loose_ref(name)
1306 if orig_ref is None:
1307 orig_ref = self.get_packed_refs().get(name)
1308 if orig_ref is None:
1309 orig_ref = ZERO_SHA
1310 if orig_ref != old_ref:
1311 return False
1313 # remove the reference file itself
1314 try:
1315 found = os.path.lexists(filename)
1316 except OSError:
1317 # may only be packed, or otherwise unstorable
1318 found = False
1320 if found:
1321 os.remove(filename)
1323 self._remove_packed_ref(name)
1324 self._log(
1325 name,
1326 old_ref,
1327 None,
1328 committer=committer,
1329 timestamp=timestamp,
1330 timezone=timezone,
1331 message=message,
1332 )
1333 finally:
1334 # never write, we just wanted the lock
1335 f.abort()
1337 # outside of the lock, clean-up any parent directory that might now
1338 # be empty. this ensures that re-creating a reference of the same
1339 # name of what was previously a directory works as expected
1340 parent = name
1341 while True:
1342 try:
1343 parent_bytes, _ = parent.rsplit(b"/", 1)
1344 parent = Ref(parent_bytes)
1345 except ValueError:
1346 break
1348 if parent == b"refs":
1349 break
1350 parent_filename = self.refpath(parent)
1351 try:
1352 os.rmdir(parent_filename)
1353 except OSError:
1354 # this can be caused by the parent directory being
1355 # removed by another process, being not empty, etc.
1356 # in any case, this is non fatal because we already
1357 # removed the reference, just ignore it
1358 break
1360 return True
1362 def pack_refs(self, all: bool = False) -> None:
1363 """Pack loose refs into packed-refs file.
1365 Args:
1366 all: If True, pack all refs. If False, only pack tags.
1367 """
1368 refs_to_pack: dict[Ref, ObjectID | None] = {}
1369 for ref in self.allkeys():
1370 if ref == HEADREF:
1371 # Never pack HEAD
1372 continue
1373 if all or ref.startswith(LOCAL_TAG_PREFIX):
1374 try:
1375 sha = self[ref]
1376 if sha:
1377 refs_to_pack[ref] = sha
1378 except KeyError:
1379 # Broken ref, skip it
1380 pass
1382 if refs_to_pack:
1383 self.add_packed_refs(refs_to_pack)
1386def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]:
1387 """Split a single ref line into a tuple of SHA1 and name."""
1388 fields = line.rstrip(b"\n\r").split(b" ")
1389 if len(fields) != 2:
1390 raise PackedRefsException(f"invalid ref line {line!r}")
1391 sha, name = fields
1392 if not valid_hexsha(sha):
1393 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1394 if not check_ref_format(Ref(name)):
1395 raise PackedRefsException(f"invalid ref name {name!r}")
1396 return (ObjectID(sha), Ref(name))
1399def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]:
1400 """Read a packed refs file.
1402 Args:
1403 f: file-like object to read from
1404 Returns: Iterator over tuples with SHA1s and ref names.
1405 """
1406 for line in f:
1407 if line.startswith(b"#"):
1408 # Comment
1409 continue
1410 if line.startswith(b"^"):
1411 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1412 yield _split_ref_line(line)
1415def read_packed_refs_with_peeled(
1416 f: IO[bytes],
1417) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]:
1418 """Read a packed refs file including peeled refs.
1420 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1421 with ref names, SHA1s, and peeled SHA1s (or None).
1423 Args:
1424 f: file-like object to read from, seek'ed to the second line
1425 """
1426 last = None
1427 for line in f:
1428 if line.startswith(b"#"):
1429 continue
1430 line = line.rstrip(b"\r\n")
1431 if line.startswith(b"^"):
1432 if not last:
1433 raise PackedRefsException("unexpected peeled ref line")
1434 if not valid_hexsha(line[1:]):
1435 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1436 sha, name = _split_ref_line(last)
1437 last = None
1438 yield (sha, name, ObjectID(line[1:]))
1439 else:
1440 if last:
1441 sha, name = _split_ref_line(last)
1442 yield (sha, name, None)
1443 last = line
1444 if last:
1445 sha, name = _split_ref_line(last)
1446 yield (sha, name, None)
1449def write_packed_refs(
1450 f: IO[bytes],
1451 packed_refs: Mapping[Ref, ObjectID],
1452 peeled_refs: Mapping[Ref, ObjectID] | None = None,
1453) -> None:
1454 """Write a packed refs file.
1456 Args:
1457 f: empty file-like object to write to
1458 packed_refs: dict of refname to sha of packed refs to write
1459 peeled_refs: dict of refname to peeled value of sha
1460 """
1461 if peeled_refs is None:
1462 peeled_refs = {}
1463 else:
1464 f.write(b"# pack-refs with: peeled\n")
1465 for refname in sorted(packed_refs.keys()):
1466 f.write(git_line(packed_refs[refname], refname))
1467 if refname in peeled_refs:
1468 f.write(b"^" + peeled_refs[refname] + b"\n")
1471def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]:
1472 """Read info/refs file.
1474 Args:
1475 f: File-like object to read from
1477 Returns:
1478 Dictionary mapping ref names to SHA1s
1479 """
1480 ret: dict[Ref, ObjectID] = {}
1481 for line_no, line in enumerate(f.readlines(), 1):
1482 stripped = line.rstrip(b"\r\n")
1483 parts = stripped.split(b"\t", 1)
1484 if len(parts) != 2:
1485 raise ValueError(
1486 f"Invalid info/refs format at line {line_no}: "
1487 f"expected '<sha>\\t<refname>', got {stripped[:100]!r}"
1488 )
1489 (sha, name) = parts
1490 ret[Ref(name)] = ObjectID(sha)
1491 return ret
1494def is_local_branch(x: bytes) -> bool:
1495 """Check if a ref name is a local branch."""
1496 return x.startswith(LOCAL_BRANCH_PREFIX)
1499def _strip_leading_slash(name: bytes, kind: str) -> bytes:
1500 """Strip a leading slash from a short ref name, warning if one is present.
1502 A leading slash here means the caller stripped a ref prefix incorrectly
1503 (e.g. used ``ref[len(b"refs/tags"):]`` instead of ``len(b"refs/tags/")``).
1504 Joining such a name with a prefix would produce a malformed ref with an
1505 empty path component. Warn rather than raise for now; this will become an
1506 error in a future release.
1507 """
1508 if not name.startswith(b"/"):
1509 return name
1510 warnings.warn(
1511 f"{kind} name must not start with a slash: {name!r}; "
1512 "this will be rejected in a future version of Dulwich.",
1513 DeprecationWarning,
1514 stacklevel=3,
1515 )
1516 return name.lstrip(b"/")
1519def local_branch_name(name: bytes) -> Ref:
1520 """Build a full branch ref from a short name.
1522 Args:
1523 name: Short branch name (e.g., b"master") or full ref
1525 Returns:
1526 Full branch ref name (e.g., b"refs/heads/master")
1528 Examples:
1529 >>> local_branch_name(b"master")
1530 b'refs/heads/master'
1531 >>> local_branch_name(b"refs/heads/master")
1532 b'refs/heads/master'
1533 """
1534 if name.startswith(LOCAL_BRANCH_PREFIX):
1535 return Ref(name)
1536 return Ref(LOCAL_BRANCH_PREFIX + _strip_leading_slash(name, "Branch"))
1539def local_tag_name(name: bytes) -> Ref:
1540 """Build a full tag ref from a short name.
1542 Args:
1543 name: Short tag name (e.g., b"v1.0") or full ref
1545 Returns:
1546 Full tag ref name (e.g., b"refs/tags/v1.0")
1548 Examples:
1549 >>> local_tag_name(b"v1.0")
1550 b'refs/tags/v1.0'
1551 >>> local_tag_name(b"refs/tags/v1.0")
1552 b'refs/tags/v1.0'
1553 """
1554 if name.startswith(LOCAL_TAG_PREFIX):
1555 return Ref(name)
1556 return Ref(LOCAL_TAG_PREFIX + _strip_leading_slash(name, "Tag"))
1559def local_replace_name(name: bytes) -> Ref:
1560 """Build a full replace ref from a short name.
1562 Args:
1563 name: Short replace name (object SHA) or full ref
1565 Returns:
1566 Full replace ref name (e.g., b"refs/replace/<sha>")
1568 Examples:
1569 >>> local_replace_name(b"abc123")
1570 b'refs/replace/abc123'
1571 >>> local_replace_name(b"refs/replace/abc123")
1572 b'refs/replace/abc123'
1573 """
1574 if name.startswith(LOCAL_REPLACE_PREFIX):
1575 return Ref(name)
1576 return Ref(LOCAL_REPLACE_PREFIX + _strip_leading_slash(name, "Replace"))
1579def extract_branch_name(ref: bytes) -> bytes:
1580 """Extract branch name from a full branch ref.
1582 Args:
1583 ref: Full branch ref (e.g., b"refs/heads/master")
1585 Returns:
1586 Short branch name (e.g., b"master")
1588 Raises:
1589 ValueError: If ref is not a local branch
1591 Examples:
1592 >>> extract_branch_name(b"refs/heads/master")
1593 b'master'
1594 >>> extract_branch_name(b"refs/heads/feature/foo")
1595 b'feature/foo'
1596 """
1597 if not ref.startswith(LOCAL_BRANCH_PREFIX):
1598 raise ValueError(f"Not a local branch ref: {ref!r}")
1599 return ref[len(LOCAL_BRANCH_PREFIX) :]
1602def extract_tag_name(ref: bytes) -> bytes:
1603 """Extract tag name from a full tag ref.
1605 Args:
1606 ref: Full tag ref (e.g., b"refs/tags/v1.0")
1608 Returns:
1609 Short tag name (e.g., b"v1.0")
1611 Raises:
1612 ValueError: If ref is not a local tag
1614 Examples:
1615 >>> extract_tag_name(b"refs/tags/v1.0")
1616 b'v1.0'
1617 """
1618 if not ref.startswith(LOCAL_TAG_PREFIX):
1619 raise ValueError(f"Not a local tag ref: {ref!r}")
1620 return ref[len(LOCAL_TAG_PREFIX) :]
1623def shorten_ref_name(ref: bytes) -> bytes:
1624 """Convert a full ref name to its short form.
1626 Args:
1627 ref: Full ref name (e.g., b"refs/heads/master")
1629 Returns:
1630 Short ref name (e.g., b"master")
1632 Examples:
1633 >>> shorten_ref_name(b"refs/heads/master")
1634 b'master'
1635 >>> shorten_ref_name(b"refs/remotes/origin/main")
1636 b'origin/main'
1637 >>> shorten_ref_name(b"refs/tags/v1.0")
1638 b'v1.0'
1639 >>> shorten_ref_name(b"HEAD")
1640 b'HEAD'
1641 """
1642 if ref.startswith(LOCAL_BRANCH_PREFIX):
1643 return ref[len(LOCAL_BRANCH_PREFIX) :]
1644 elif ref.startswith(LOCAL_REMOTE_PREFIX):
1645 return ref[len(LOCAL_REMOTE_PREFIX) :]
1646 elif ref.startswith(LOCAL_TAG_PREFIX):
1647 return ref[len(LOCAL_TAG_PREFIX) :]
1648 return ref
1651def _set_origin_head(
1652 refs: RefsContainer, origin: bytes, origin_head: bytes | None
1653) -> None:
1654 # set refs/remotes/origin/HEAD
1655 origin_base = b"refs/remotes/" + origin + b"/"
1656 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1657 origin_ref = Ref(origin_base + HEADREF)
1658 target_ref = Ref(origin_base + extract_branch_name(origin_head))
1659 if target_ref in refs:
1660 refs.set_symbolic_ref(origin_ref, target_ref)
1663def _set_default_branch(
1664 refs: RefsContainer,
1665 origin: bytes,
1666 origin_head: bytes | None,
1667 branch: bytes | None,
1668 ref_message: bytes | None,
1669) -> bytes:
1670 """Set the default branch."""
1671 origin_base = b"refs/remotes/" + origin + b"/"
1672 if branch:
1673 origin_ref = Ref(origin_base + branch)
1674 if origin_ref in refs:
1675 local_ref = Ref(local_branch_name(branch))
1676 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1677 head_ref = local_ref
1678 elif Ref(local_tag_name(branch)) in refs:
1679 head_ref = Ref(local_tag_name(branch))
1680 else:
1681 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1682 elif origin_head:
1683 head_ref = Ref(origin_head)
1684 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1685 origin_ref = Ref(origin_base + extract_branch_name(origin_head))
1686 else:
1687 origin_ref = Ref(origin_head)
1688 try:
1689 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1690 except KeyError:
1691 pass
1692 else:
1693 raise ValueError("neither origin_head nor branch are provided")
1694 return head_ref
1697def _set_head(
1698 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None
1699) -> ObjectID | None:
1700 if head_ref.startswith(LOCAL_TAG_PREFIX):
1701 # detach HEAD at specified tag
1702 head = refs[Ref(head_ref)]
1703 if isinstance(head, Tag):
1704 _cls, obj = head.object
1705 head = obj.get_object(obj).id
1706 del refs[HEADREF]
1707 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1708 else:
1709 # set HEAD to specific branch
1710 try:
1711 head = refs[Ref(head_ref)]
1712 refs.set_symbolic_ref(HEADREF, Ref(head_ref))
1713 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1714 except KeyError:
1715 head = None
1716 return head
1719def _import_remote_refs(
1720 refs_container: RefsContainer,
1721 remote_name: str,
1722 refs: Mapping[Ref, ObjectID | None],
1723 message: bytes | None = None,
1724 prune: bool = False,
1725 prune_tags: bool = False,
1726) -> None:
1727 from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs
1729 stripped_refs = strip_peeled_refs(refs)
1730 branches: dict[Ref, ObjectID | None] = {
1731 Ref(extract_branch_name(n)): v
1732 for (n, v) in stripped_refs.items()
1733 if n.startswith(LOCAL_BRANCH_PREFIX)
1734 }
1735 refs_container.import_refs(
1736 Ref(b"refs/remotes/" + remote_name.encode()),
1737 branches,
1738 message=message,
1739 prune=prune,
1740 )
1741 tags: dict[Ref, ObjectID | None] = {
1742 Ref(extract_tag_name(n)): v
1743 for (n, v) in stripped_refs.items()
1744 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1745 }
1746 refs_container.import_refs(
1747 Ref(b"refs/tags"), tags, message=message, prune=prune_tags
1748 )
1751class locked_ref:
1752 """Lock a ref while making modifications.
1754 Works as a context manager.
1755 """
1757 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1758 """Initialize a locked ref.
1760 Args:
1761 refs_container: The DiskRefsContainer to lock the ref in
1762 refname: The ref name to lock
1763 """
1764 self._refs_container = refs_container
1765 self._refname = refname
1766 self._file: _GitFile | None = None
1767 self._realname: Ref | None = None
1768 self._deleted = False
1770 def __enter__(self) -> Self:
1771 """Enter the context manager and acquire the lock.
1773 Returns:
1774 This locked_ref instance
1776 Raises:
1777 OSError: If the lock cannot be acquired
1778 """
1779 self._refs_container._check_refname(self._refname)
1780 try:
1781 realnames, _ = self._refs_container.follow(self._refname)
1782 self._realname = realnames[-1]
1783 except (KeyError, IndexError, SymrefLoop):
1784 self._realname = self._refname
1786 filename = self._refs_container.refpath(self._realname)
1787 ensure_dir_exists(os.path.dirname(filename))
1788 f = GitFile(filename, "wb")
1789 self._file = f
1790 return self
1792 def __exit__(
1793 self,
1794 exc_type: type | None,
1795 exc_value: BaseException | None,
1796 traceback: types.TracebackType | None,
1797 ) -> None:
1798 """Exit the context manager and release the lock.
1800 Args:
1801 exc_type: Type of exception if one occurred
1802 exc_value: Exception instance if one occurred
1803 traceback: Traceback if an exception occurred
1804 """
1805 if self._file:
1806 if exc_type is not None or self._deleted:
1807 self._file.abort()
1808 else:
1809 self._file.close()
1811 def get(self) -> bytes | None:
1812 """Get the current value of the ref."""
1813 if not self._file:
1814 raise RuntimeError("locked_ref not in context")
1816 assert self._realname is not None
1817 current_ref = self._refs_container.read_loose_ref(self._realname)
1818 if current_ref is None:
1819 current_ref = self._refs_container.get_packed_refs().get(
1820 self._realname, None
1821 )
1822 return current_ref
1824 def ensure_equals(self, expected_value: bytes | None) -> bool:
1825 """Ensure the ref currently equals the expected value.
1827 Args:
1828 expected_value: The expected current value of the ref
1829 Returns:
1830 True if the ref equals the expected value, False otherwise
1831 """
1832 current_value = self.get()
1833 return current_value == expected_value
1835 def set(self, new_ref: bytes) -> None:
1836 """Set the ref to a new value.
1838 Args:
1839 new_ref: The new SHA1 or symbolic ref value
1840 """
1841 if not self._file:
1842 raise RuntimeError("locked_ref not in context")
1844 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1845 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1847 self._file.seek(0)
1848 self._file.truncate()
1849 self._file.write(new_ref + b"\n")
1850 self._deleted = False
1852 def set_symbolic_ref(self, target: Ref) -> None:
1853 """Make this ref point at another ref.
1855 Args:
1856 target: Name of the ref to point at
1857 """
1858 if not self._file:
1859 raise RuntimeError("locked_ref not in context")
1861 self._refs_container._check_refname(target)
1862 self._file.seek(0)
1863 self._file.truncate()
1864 self._file.write(SYMREF + target + b"\n")
1865 self._deleted = False
1867 def delete(self) -> None:
1868 """Delete the ref file while holding the lock."""
1869 if not self._file:
1870 raise RuntimeError("locked_ref not in context")
1872 # Delete the actual ref file while holding the lock
1873 if self._realname:
1874 filename = self._refs_container.refpath(self._realname)
1875 try:
1876 if os.path.lexists(filename):
1877 os.remove(filename)
1878 except FileNotFoundError:
1879 pass
1880 self._refs_container._remove_packed_ref(self._realname)
1882 self._deleted = True
1885class NamespacedRefsContainer(RefsContainer):
1886 """Wrapper that adds namespace prefix to all ref operations.
1888 This implements Git's GIT_NAMESPACE feature, which stores refs under
1889 refs/namespaces/<namespace>/ and filters operations to only show refs
1890 within that namespace.
1892 Example:
1893 With namespace "foo", a ref "refs/heads/master" is stored as
1894 "refs/namespaces/foo/refs/heads/master" in the underlying container.
1895 """
1897 def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
1898 """Initialize NamespacedRefsContainer.
1900 Args:
1901 refs: The underlying refs container to wrap
1902 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
1903 """
1904 super().__init__(logger=refs._logger)
1905 self._refs = refs
1906 # Build namespace prefix: refs/namespaces/<namespace>/
1907 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
1908 namespace_parts = namespace.split(b"/")
1909 self._namespace_prefix = b""
1910 for part in namespace_parts:
1911 self._namespace_prefix += b"refs/namespaces/" + part + b"/"
1913 def _apply_namespace(self, name: bytes) -> bytes:
1914 """Apply namespace prefix to a ref name."""
1915 # HEAD and other special refs are not namespaced
1916 if name == HEADREF or not name.startswith(b"refs/"):
1917 return name
1918 return self._namespace_prefix + name
1920 def _strip_namespace(self, name: bytes) -> bytes | None:
1921 """Remove namespace prefix from a ref name.
1923 Returns None if the ref is not in our namespace.
1924 """
1925 # HEAD and other special refs are not namespaced
1926 if name == HEADREF or not name.startswith(b"refs/"):
1927 return name
1928 if name.startswith(self._namespace_prefix):
1929 return name[len(self._namespace_prefix) :]
1930 return None
1932 def allkeys(self) -> set[Ref]:
1933 """Return all reference keys in this namespace."""
1934 keys: set[Ref] = set()
1935 for key in self._refs.allkeys():
1936 stripped = self._strip_namespace(key)
1937 if stripped is not None:
1938 keys.add(Ref(stripped))
1939 return keys
1941 def read_loose_ref(self, name: Ref) -> bytes | None:
1942 """Read a loose reference."""
1943 return self._refs.read_loose_ref(Ref(self._apply_namespace(name)))
1945 def get_packed_refs(self) -> dict[Ref, ObjectID]:
1946 """Get packed refs within this namespace."""
1947 packed: dict[Ref, ObjectID] = {}
1948 for name, value in self._refs.get_packed_refs().items():
1949 stripped = self._strip_namespace(name)
1950 if stripped is not None:
1951 packed[Ref(stripped)] = value
1952 return packed
1954 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
1955 """Add packed refs with namespace prefix."""
1956 namespaced_refs: dict[Ref, ObjectID | None] = {
1957 Ref(self._apply_namespace(name)): value for name, value in new_refs.items()
1958 }
1959 self._refs.add_packed_refs(namespaced_refs)
1961 def get_peeled(self, name: Ref) -> ObjectID | None:
1962 """Return the cached peeled value of a ref."""
1963 return self._refs.get_peeled(Ref(self._apply_namespace(name)))
1965 def set_symbolic_ref(
1966 self,
1967 name: Ref,
1968 other: Ref,
1969 committer: bytes | None = None,
1970 timestamp: int | None = None,
1971 timezone: int | None = None,
1972 message: bytes | None = None,
1973 ) -> None:
1974 """Make a ref point at another ref."""
1975 self._refs.set_symbolic_ref(
1976 Ref(self._apply_namespace(name)),
1977 Ref(self._apply_namespace(other)),
1978 committer=committer,
1979 timestamp=timestamp,
1980 timezone=timezone,
1981 message=message,
1982 )
1984 def set_if_equals(
1985 self,
1986 name: Ref,
1987 old_ref: ObjectID | None,
1988 new_ref: ObjectID,
1989 committer: bytes | None = None,
1990 timestamp: int | None = None,
1991 timezone: int | None = None,
1992 message: bytes | None = None,
1993 ) -> bool:
1994 """Set a refname to new_ref only if it currently equals old_ref."""
1995 return self._refs.set_if_equals(
1996 Ref(self._apply_namespace(name)),
1997 old_ref,
1998 new_ref,
1999 committer=committer,
2000 timestamp=timestamp,
2001 timezone=timezone,
2002 message=message,
2003 )
2005 def add_if_new(
2006 self,
2007 name: Ref,
2008 ref: ObjectID,
2009 committer: bytes | None = None,
2010 timestamp: int | None = None,
2011 timezone: int | None = None,
2012 message: bytes | None = None,
2013 ) -> bool:
2014 """Add a new reference only if it does not already exist."""
2015 return self._refs.add_if_new(
2016 Ref(self._apply_namespace(name)),
2017 ref,
2018 committer=committer,
2019 timestamp=timestamp,
2020 timezone=timezone,
2021 message=message,
2022 )
2024 def remove_if_equals(
2025 self,
2026 name: Ref,
2027 old_ref: ObjectID | None,
2028 committer: bytes | None = None,
2029 timestamp: int | None = None,
2030 timezone: int | None = None,
2031 message: bytes | None = None,
2032 ) -> bool:
2033 """Remove a refname only if it currently equals old_ref."""
2034 return self._refs.remove_if_equals(
2035 Ref(self._apply_namespace(name)),
2036 old_ref,
2037 committer=committer,
2038 timestamp=timestamp,
2039 timezone=timezone,
2040 message=message,
2041 )
2043 def pack_refs(self, all: bool = False) -> None:
2044 """Pack loose refs into packed-refs file.
2046 Note: This packs all refs in the underlying container, not just
2047 those in the namespace.
2048 """
2049 self._refs.pack_refs(all=all)
2052def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
2053 """Filter refs to only include those with a given prefix.
2055 Args:
2056 refs: A dictionary of refs.
2057 prefixes: The prefixes to filter by.
2058 """
2059 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
2060 return filtered
2063def is_per_worktree_ref(ref: bytes) -> bool:
2064 """Returns whether a reference is stored per worktree or not.
2066 Per-worktree references are:
2067 - all pseudorefs, e.g. HEAD
2068 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
2070 All refs starting with "refs/" are shared, except for the ones listed above.
2072 See https://git-scm.com/docs/git-worktree#_refs.
2073 """
2074 return not ref.startswith(b"refs/") or ref.startswith(
2075 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
2076 )