Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import types
27import warnings
28from collections.abc import Callable, Iterable, Iterator, Mapping
29from contextlib import suppress
30from typing import (
31 IO,
32 TYPE_CHECKING,
33 Any,
34 BinaryIO,
35 TypeVar,
36)
38if TYPE_CHECKING:
39 from .file import _GitFile
41from .errors import PackedRefsException, RefFormatError
42from .file import GitFile, ensure_dir_exists
43from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
44from .pack import ObjectContainer
46Ref = bytes
48HEADREF = b"HEAD"
49SYMREF = b"ref: "
50LOCAL_BRANCH_PREFIX = b"refs/heads/"
51LOCAL_TAG_PREFIX = b"refs/tags/"
52LOCAL_REMOTE_PREFIX = b"refs/remotes/"
53LOCAL_NOTES_PREFIX = b"refs/notes/"
54LOCAL_REPLACE_PREFIX = b"refs/replace/"
55BAD_REF_CHARS = set(b"\177 ~^:?*[")
56PEELED_TAG_SUFFIX = b"^{}"
58# For backwards compatibility
59ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
62class SymrefLoop(Exception):
63 """There is a loop between one or more symrefs."""
65 def __init__(self, ref: bytes, depth: int) -> None:
66 """Initialize SymrefLoop exception."""
67 self.ref = ref
68 self.depth = depth
71def parse_symref_value(contents: bytes) -> bytes:
72 """Parse a symref value.
74 Args:
75 contents: Contents to parse
76 Returns: Destination
77 """
78 if contents.startswith(SYMREF):
79 return contents[len(SYMREF) :].rstrip(b"\r\n")
80 raise ValueError(contents)
83def check_ref_format(refname: Ref) -> bool:
84 """Check if a refname is correctly formatted.
86 Implements all the same rules as git-check-ref-format[1].
88 [1]
89 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
91 Args:
92 refname: The refname to check
93 Returns: True if refname is valid, False otherwise
94 """
95 # These could be combined into one big expression, but are listed
96 # separately to parallel [1].
97 if b"/." in refname or refname.startswith(b"."):
98 return False
99 if b"/" not in refname:
100 return False
101 if b".." in refname:
102 return False
103 for i, c in enumerate(refname):
104 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
105 return False
106 if refname[-1] in b"/.":
107 return False
108 if refname.endswith(b".lock"):
109 return False
110 if b"@{" in refname:
111 return False
112 if b"\\" in refname:
113 return False
114 return True
117def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
118 """Parse a remote ref into remote name and branch name.
120 Args:
121 ref: Remote ref like b"refs/remotes/origin/main"
123 Returns:
124 Tuple of (remote_name, branch_name)
126 Raises:
127 ValueError: If ref is not a valid remote ref
128 """
129 if not ref.startswith(LOCAL_REMOTE_PREFIX):
130 raise ValueError(f"Not a remote ref: {ref!r}")
132 # Remove the prefix
133 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
135 # Split into remote name and branch name
136 parts = remainder.split(b"/", 1)
137 if len(parts) != 2:
138 raise ValueError(f"Invalid remote ref format: {ref!r}")
140 remote_name, branch_name = parts
141 return (remote_name, branch_name)
144class RefsContainer:
145 """A container for refs."""
147 def __init__(
148 self,
149 logger: Callable[
150 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
151 ]
152 | None = None,
153 ) -> None:
154 """Initialize RefsContainer with optional logger function."""
155 self._logger = logger
157 def _log(
158 self,
159 ref: bytes,
160 old_sha: bytes | None,
161 new_sha: bytes | None,
162 committer: bytes | None = None,
163 timestamp: int | None = None,
164 timezone: int | None = None,
165 message: bytes | None = None,
166 ) -> None:
167 if self._logger is None:
168 return
169 if message is None:
170 return
171 # Use ZERO_SHA for None values, matching git behavior
172 if old_sha is None:
173 old_sha = ZERO_SHA
174 if new_sha is None:
175 new_sha = ZERO_SHA
176 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
178 def set_symbolic_ref(
179 self,
180 name: bytes,
181 other: bytes,
182 committer: bytes | None = None,
183 timestamp: int | None = None,
184 timezone: int | None = None,
185 message: bytes | None = None,
186 ) -> None:
187 """Make a ref point at another ref.
189 Args:
190 name: Name of the ref to set
191 other: Name of the ref to point at
192 committer: Optional committer name/email
193 timestamp: Optional timestamp
194 timezone: Optional timezone
195 message: Optional message
196 """
197 raise NotImplementedError(self.set_symbolic_ref)
199 def get_packed_refs(self) -> dict[Ref, ObjectID]:
200 """Get contents of the packed-refs file.
202 Returns: Dictionary mapping ref names to SHA1s
204 Note: Will return an empty dictionary when no packed-refs file is
205 present.
206 """
207 raise NotImplementedError(self.get_packed_refs)
209 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
210 """Add the given refs as packed refs.
212 Args:
213 new_refs: A mapping of ref names to targets; if a target is None that
214 means remove the ref
215 """
216 raise NotImplementedError(self.add_packed_refs)
218 def get_peeled(self, name: bytes) -> ObjectID | None:
219 """Return the cached peeled value of a ref, if available.
221 Args:
222 name: Name of the ref to peel
223 Returns: The peeled value of the ref. If the ref is known not point to
224 a tag, this will be the SHA the ref refers to. If the ref may point
225 to a tag, but no cached information is available, None is returned.
226 """
227 return None
229 def import_refs(
230 self,
231 base: Ref,
232 other: Mapping[Ref, ObjectID],
233 committer: bytes | None = None,
234 timestamp: bytes | None = None,
235 timezone: bytes | None = None,
236 message: bytes | None = None,
237 prune: bool = False,
238 ) -> None:
239 """Import refs from another repository.
241 Args:
242 base: Base ref to import into (e.g., b'refs/remotes/origin')
243 other: Dictionary of refs to import
244 committer: Optional committer for reflog
245 timestamp: Optional timestamp for reflog
246 timezone: Optional timezone for reflog
247 message: Optional message for reflog
248 prune: If True, remove refs not in other
249 """
250 if prune:
251 to_delete = set(self.subkeys(base))
252 else:
253 to_delete = set()
254 for name, value in other.items():
255 if value is None:
256 to_delete.add(name)
257 else:
258 self.set_if_equals(
259 b"/".join((base, name)), None, value, message=message
260 )
261 if to_delete:
262 try:
263 to_delete.remove(name)
264 except KeyError:
265 pass
266 for ref in to_delete:
267 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
269 def allkeys(self) -> set[Ref]:
270 """All refs present in this container."""
271 raise NotImplementedError(self.allkeys)
273 def __iter__(self) -> Iterator[Ref]:
274 """Iterate over all reference keys."""
275 return iter(self.allkeys())
277 def keys(self, base: bytes | None = None) -> set[bytes]:
278 """Refs present in this container.
280 Args:
281 base: An optional base to return refs under.
282 Returns: An unsorted set of valid refs in this container, including
283 packed refs.
284 """
285 if base is not None:
286 return self.subkeys(base)
287 else:
288 return self.allkeys()
290 def subkeys(self, base: bytes) -> set[bytes]:
291 """Refs present in this container under a base.
293 Args:
294 base: The base to return refs under.
295 Returns: A set of valid refs in this container under the base; the base
296 prefix is stripped from the ref names returned.
297 """
298 keys = set()
299 base_len = len(base) + 1
300 for refname in self.allkeys():
301 if refname.startswith(base):
302 keys.add(refname[base_len:])
303 return keys
305 def as_dict(self, base: bytes | None = None) -> dict[Ref, ObjectID]:
306 """Return the contents of this container as a dictionary."""
307 ret = {}
308 keys = self.keys(base)
309 if base is None:
310 base = b""
311 else:
312 base = base.rstrip(b"/")
313 for key in keys:
314 try:
315 ret[key] = self[(base + b"/" + key).strip(b"/")]
316 except (SymrefLoop, KeyError):
317 continue # Unable to resolve
319 return ret
321 def _check_refname(self, name: bytes) -> None:
322 """Ensure a refname is valid and lives in refs or is HEAD.
324 HEAD is not a valid refname according to git-check-ref-format, but this
325 class needs to be able to touch HEAD. Also, check_ref_format expects
326 refnames without the leading 'refs/', but this class requires that
327 so it cannot touch anything outside the refs dir (or HEAD).
329 Args:
330 name: The name of the reference.
332 Raises:
333 KeyError: if a refname is not HEAD or is otherwise not valid.
334 """
335 if name in (HEADREF, b"refs/stash"):
336 return
337 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
338 raise RefFormatError(name)
340 def read_ref(self, refname: bytes) -> bytes | None:
341 """Read a reference without following any references.
343 Args:
344 refname: The name of the reference
345 Returns: The contents of the ref file, or None if it does
346 not exist.
347 """
348 contents = self.read_loose_ref(refname)
349 if not contents:
350 contents = self.get_packed_refs().get(refname, None)
351 return contents
353 def read_loose_ref(self, name: bytes) -> bytes | None:
354 """Read a loose reference and return its contents.
356 Args:
357 name: the refname to read
358 Returns: The contents of the ref file, or None if it does
359 not exist.
360 """
361 raise NotImplementedError(self.read_loose_ref)
363 def follow(self, name: bytes) -> tuple[list[bytes], bytes | None]:
364 """Follow a reference name.
366 Returns: a tuple of (refnames, sha), wheres refnames are the names of
367 references in the chain
368 """
369 contents: bytes | None = SYMREF + name
370 depth = 0
371 refnames = []
372 while contents and contents.startswith(SYMREF):
373 refname = contents[len(SYMREF) :]
374 refnames.append(refname)
375 contents = self.read_ref(refname)
376 if not contents:
377 break
378 depth += 1
379 if depth > 5:
380 raise SymrefLoop(name, depth)
381 return refnames, contents
383 def __contains__(self, refname: bytes) -> bool:
384 """Check if a reference exists."""
385 if self.read_ref(refname):
386 return True
387 return False
389 def __getitem__(self, name: bytes) -> ObjectID:
390 """Get the SHA1 for a reference name.
392 This method follows all symbolic references.
393 """
394 _, sha = self.follow(name)
395 if sha is None:
396 raise KeyError(name)
397 return sha
399 def set_if_equals(
400 self,
401 name: bytes,
402 old_ref: bytes | None,
403 new_ref: bytes,
404 committer: bytes | None = None,
405 timestamp: int | None = None,
406 timezone: int | None = None,
407 message: bytes | None = None,
408 ) -> bool:
409 """Set a refname to new_ref only if it currently equals old_ref.
411 This method follows all symbolic references if applicable for the
412 subclass, and can be used to perform an atomic compare-and-swap
413 operation.
415 Args:
416 name: The refname to set.
417 old_ref: The old sha the refname must refer to, or None to set
418 unconditionally.
419 new_ref: The new sha the refname will refer to.
420 committer: Optional committer name/email
421 timestamp: Optional timestamp
422 timezone: Optional timezone
423 message: Message for reflog
424 Returns: True if the set was successful, False otherwise.
425 """
426 raise NotImplementedError(self.set_if_equals)
428 def add_if_new(
429 self,
430 name: bytes,
431 ref: bytes,
432 committer: bytes | None = None,
433 timestamp: int | None = None,
434 timezone: int | None = None,
435 message: bytes | None = None,
436 ) -> bool:
437 """Add a new reference only if it does not already exist.
439 Args:
440 name: Ref name
441 ref: Ref value
442 committer: Optional committer name/email
443 timestamp: Optional timestamp
444 timezone: Optional timezone
445 message: Optional message for reflog
446 """
447 raise NotImplementedError(self.add_if_new)
449 def __setitem__(self, name: bytes, ref: bytes) -> None:
450 """Set a reference name to point to the given SHA1.
452 This method follows all symbolic references if applicable for the
453 subclass.
455 Note: This method unconditionally overwrites the contents of a
456 reference. To update atomically only if the reference has not
457 changed, use set_if_equals().
459 Args:
460 name: The refname to set.
461 ref: The new sha the refname will refer to.
462 """
463 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
464 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
465 self.set_if_equals(name, None, ref)
467 def remove_if_equals(
468 self,
469 name: bytes,
470 old_ref: bytes | None,
471 committer: bytes | None = None,
472 timestamp: int | None = None,
473 timezone: int | None = None,
474 message: bytes | None = None,
475 ) -> bool:
476 """Remove a refname only if it currently equals old_ref.
478 This method does not follow symbolic references, even if applicable for
479 the subclass. It can be used to perform an atomic compare-and-delete
480 operation.
482 Args:
483 name: The refname to delete.
484 old_ref: The old sha the refname must refer to, or None to
485 delete unconditionally.
486 committer: Optional committer name/email
487 timestamp: Optional timestamp
488 timezone: Optional timezone
489 message: Message for reflog
490 Returns: True if the delete was successful, False otherwise.
491 """
492 raise NotImplementedError(self.remove_if_equals)
494 def __delitem__(self, name: bytes) -> None:
495 """Remove a refname.
497 This method does not follow symbolic references, even if applicable for
498 the subclass.
500 Note: This method unconditionally deletes the contents of a reference.
501 To delete atomically only if the reference has not changed, use
502 remove_if_equals().
504 Args:
505 name: The refname to delete.
506 """
507 self.remove_if_equals(name, None)
509 def get_symrefs(self) -> dict[bytes, bytes]:
510 """Get a dict with all symrefs in this container.
512 Returns: Dictionary mapping source ref to target ref
513 """
514 ret = {}
515 for src in self.allkeys():
516 try:
517 ref_value = self.read_ref(src)
518 assert ref_value is not None
519 dst = parse_symref_value(ref_value)
520 except ValueError:
521 pass
522 else:
523 ret[src] = dst
524 return ret
526 def pack_refs(self, all: bool = False) -> None:
527 """Pack loose refs into packed-refs file.
529 Args:
530 all: If True, pack all refs. If False, only pack tags.
531 """
532 raise NotImplementedError(self.pack_refs)
535class DictRefsContainer(RefsContainer):
536 """RefsContainer backed by a simple dict.
538 This container does not support symbolic or packed references and is not
539 threadsafe.
540 """
542 def __init__(
543 self,
544 refs: dict[bytes, bytes],
545 logger: Callable[
546 [
547 bytes,
548 bytes | None,
549 bytes | None,
550 bytes | None,
551 int | None,
552 int | None,
553 bytes | None,
554 ],
555 None,
556 ]
557 | None = None,
558 ) -> None:
559 """Initialize DictRefsContainer with refs dictionary and optional logger."""
560 super().__init__(logger=logger)
561 self._refs = refs
562 self._peeled: dict[bytes, ObjectID] = {}
563 self._watchers: set[Any] = set()
565 def allkeys(self) -> set[bytes]:
566 """Return all reference keys."""
567 return set(self._refs.keys())
569 def read_loose_ref(self, name: bytes) -> bytes | None:
570 """Read a loose reference."""
571 return self._refs.get(name, None)
573 def get_packed_refs(self) -> dict[bytes, bytes]:
574 """Get packed references."""
575 return {}
577 def _notify(self, ref: bytes, newsha: bytes | None) -> None:
578 for watcher in self._watchers:
579 watcher._notify((ref, newsha))
581 def set_symbolic_ref(
582 self,
583 name: Ref,
584 other: Ref,
585 committer: bytes | None = None,
586 timestamp: int | None = None,
587 timezone: int | None = None,
588 message: bytes | None = None,
589 ) -> None:
590 """Make a ref point at another ref.
592 Args:
593 name: Name of the ref to set
594 other: Name of the ref to point at
595 committer: Optional committer name for reflog
596 timestamp: Optional timestamp for reflog
597 timezone: Optional timezone for reflog
598 message: Optional message for reflog
599 """
600 old = self.follow(name)[-1]
601 new = SYMREF + other
602 self._refs[name] = new
603 self._notify(name, new)
604 self._log(
605 name,
606 old,
607 new,
608 committer=committer,
609 timestamp=timestamp,
610 timezone=timezone,
611 message=message,
612 )
614 def set_if_equals(
615 self,
616 name: bytes,
617 old_ref: bytes | None,
618 new_ref: bytes,
619 committer: bytes | None = None,
620 timestamp: int | None = None,
621 timezone: int | None = None,
622 message: bytes | None = None,
623 ) -> bool:
624 """Set a refname to new_ref only if it currently equals old_ref.
626 This method follows all symbolic references, and can be used to perform
627 an atomic compare-and-swap operation.
629 Args:
630 name: The refname to set.
631 old_ref: The old sha the refname must refer to, or None to set
632 unconditionally.
633 new_ref: The new sha the refname will refer to.
634 committer: Optional committer name for reflog
635 timestamp: Optional timestamp for reflog
636 timezone: Optional timezone for reflog
637 message: Optional message for reflog
639 Returns:
640 True if the set was successful, False otherwise.
641 """
642 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
643 return False
644 # Only update the specific ref requested, not the whole chain
645 self._check_refname(name)
646 old = self._refs.get(name)
647 self._refs[name] = new_ref
648 self._notify(name, new_ref)
649 self._log(
650 name,
651 old,
652 new_ref,
653 committer=committer,
654 timestamp=timestamp,
655 timezone=timezone,
656 message=message,
657 )
658 return True
660 def add_if_new(
661 self,
662 name: Ref,
663 ref: ObjectID,
664 committer: bytes | None = None,
665 timestamp: int | None = None,
666 timezone: int | None = None,
667 message: bytes | None = None,
668 ) -> bool:
669 """Add a new reference only if it does not already exist.
671 Args:
672 name: Ref name
673 ref: Ref value
674 committer: Optional committer name for reflog
675 timestamp: Optional timestamp for reflog
676 timezone: Optional timezone for reflog
677 message: Optional message for reflog
679 Returns:
680 True if the add was successful, False otherwise.
681 """
682 if name in self._refs:
683 return False
684 self._refs[name] = ref
685 self._notify(name, ref)
686 self._log(
687 name,
688 None,
689 ref,
690 committer=committer,
691 timestamp=timestamp,
692 timezone=timezone,
693 message=message,
694 )
695 return True
697 def remove_if_equals(
698 self,
699 name: bytes,
700 old_ref: bytes | None,
701 committer: bytes | None = None,
702 timestamp: int | None = None,
703 timezone: int | None = None,
704 message: bytes | None = None,
705 ) -> bool:
706 """Remove a refname only if it currently equals old_ref.
708 This method does not follow symbolic references. It can be used to
709 perform an atomic compare-and-delete operation.
711 Args:
712 name: The refname to delete.
713 old_ref: The old sha the refname must refer to, or None to
714 delete unconditionally.
715 committer: Optional committer name for reflog
716 timestamp: Optional timestamp for reflog
717 timezone: Optional timezone for reflog
718 message: Optional message for reflog
720 Returns:
721 True if the delete was successful, False otherwise.
722 """
723 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
724 return False
725 try:
726 old = self._refs.pop(name)
727 except KeyError:
728 pass
729 else:
730 self._notify(name, None)
731 self._log(
732 name,
733 old,
734 None,
735 committer=committer,
736 timestamp=timestamp,
737 timezone=timezone,
738 message=message,
739 )
740 return True
742 def get_peeled(self, name: bytes) -> bytes | None:
743 """Get peeled version of a reference."""
744 return self._peeled.get(name)
746 def _update(self, refs: Mapping[bytes, bytes]) -> None:
747 """Update multiple refs; intended only for testing."""
748 # TODO(dborowitz): replace this with a public function that uses
749 # set_if_equal.
750 for ref, sha in refs.items():
751 self.set_if_equals(ref, None, sha)
753 def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None:
754 """Update cached peeled refs; intended only for testing."""
755 self._peeled.update(peeled)
758class InfoRefsContainer(RefsContainer):
759 """Refs container that reads refs from a info/refs file."""
761 def __init__(self, f: BinaryIO) -> None:
762 """Initialize InfoRefsContainer from info/refs file."""
763 self._refs: dict[bytes, bytes] = {}
764 self._peeled: dict[bytes, bytes] = {}
765 refs = read_info_refs(f)
766 (self._refs, self._peeled) = split_peeled_refs(refs)
768 def allkeys(self) -> set[bytes]:
769 """Return all reference keys."""
770 return set(self._refs.keys())
772 def read_loose_ref(self, name: bytes) -> bytes | None:
773 """Read a loose reference."""
774 return self._refs.get(name, None)
776 def get_packed_refs(self) -> dict[bytes, bytes]:
777 """Get packed references."""
778 return {}
780 def get_peeled(self, name: bytes) -> bytes | None:
781 """Get peeled version of a reference."""
782 try:
783 return self._peeled[name]
784 except KeyError:
785 return self._refs[name]
788class DiskRefsContainer(RefsContainer):
789 """Refs container that reads refs from disk."""
791 def __init__(
792 self,
793 path: str | bytes | os.PathLike[str],
794 worktree_path: str | bytes | os.PathLike[str] | None = None,
795 logger: Callable[
796 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
797 ]
798 | None = None,
799 ) -> None:
800 """Initialize DiskRefsContainer."""
801 super().__init__(logger=logger)
802 # Convert path-like objects to strings, then to bytes for Git compatibility
803 self.path = os.fsencode(os.fspath(path))
804 if worktree_path is None:
805 self.worktree_path = self.path
806 else:
807 self.worktree_path = os.fsencode(os.fspath(worktree_path))
808 self._packed_refs: dict[bytes, bytes] | None = None
809 self._peeled_refs: dict[bytes, bytes] | None = None
811 def __repr__(self) -> str:
812 """Return string representation of DiskRefsContainer."""
813 return f"{self.__class__.__name__}({self.path!r})"
815 def _iter_dir(
816 self,
817 path: bytes,
818 base: bytes,
819 dir_filter: Callable[[bytes], bool] | None = None,
820 ) -> Iterator[bytes]:
821 refspath = os.path.join(path, base.rstrip(b"/"))
822 prefix_len = len(os.path.join(path, b""))
824 for root, dirs, files in os.walk(refspath):
825 directory = root[prefix_len:]
826 if os.path.sep != "/":
827 directory = directory.replace(os.fsencode(os.path.sep), b"/")
828 if dir_filter is not None:
829 dirs[:] = [
830 d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
831 ]
833 for filename in files:
834 refname = b"/".join([directory, filename])
835 if check_ref_format(refname):
836 yield refname
838 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
839 base = base.rstrip(b"/") + b"/"
840 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = []
841 if base != b"refs/":
842 path = self.worktree_path if is_per_worktree_ref(base) else self.path
843 search_paths.append((path, None))
844 elif self.worktree_path == self.path:
845 # Iterate through all the refs from the main worktree
846 search_paths.append((self.path, None))
847 else:
848 # Iterate through all the shared refs from the commondir, excluding per-worktree refs
849 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
850 # Iterate through all the per-worktree refs from the worktree's gitdir
851 search_paths.append((self.worktree_path, is_per_worktree_ref))
853 for path, dir_filter in search_paths:
854 yield from self._iter_dir(path, base, dir_filter=dir_filter)
856 def subkeys(self, base: bytes) -> set[bytes]:
857 """Return subkeys under a given base reference path."""
858 subkeys = set()
860 for key in self._iter_loose_refs(base):
861 if key.startswith(base):
862 subkeys.add(key[len(base) :].strip(b"/"))
864 for key in self.get_packed_refs():
865 if key.startswith(base):
866 subkeys.add(key[len(base) :].strip(b"/"))
867 return subkeys
869 def allkeys(self) -> set[bytes]:
870 """Return all reference keys."""
871 allkeys = set()
872 if os.path.exists(self.refpath(HEADREF)):
873 allkeys.add(HEADREF)
875 allkeys.update(self._iter_loose_refs())
876 allkeys.update(self.get_packed_refs())
877 return allkeys
879 def refpath(self, name: bytes) -> bytes:
880 """Return the disk path of a ref."""
881 path = name
882 if os.path.sep != "/":
883 path = path.replace(b"/", os.fsencode(os.path.sep))
885 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
886 return os.path.join(root_dir, path)
888 def get_packed_refs(self) -> dict[bytes, bytes]:
889 """Get contents of the packed-refs file.
891 Returns: Dictionary mapping ref names to SHA1s
893 Note: Will return an empty dictionary when no packed-refs file is
894 present.
895 """
896 # TODO: invalidate the cache on repacking
897 if self._packed_refs is None:
898 # set both to empty because we want _peeled_refs to be
899 # None if and only if _packed_refs is also None.
900 self._packed_refs = {}
901 self._peeled_refs = {}
902 path = os.path.join(self.path, b"packed-refs")
903 try:
904 f = GitFile(path, "rb")
905 except FileNotFoundError:
906 return {}
907 with f:
908 first_line = next(iter(f)).rstrip()
909 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
910 for sha, name, peeled in read_packed_refs_with_peeled(f):
911 self._packed_refs[name] = sha
912 if peeled:
913 self._peeled_refs[name] = peeled
914 else:
915 f.seek(0)
916 for sha, name in read_packed_refs(f):
917 self._packed_refs[name] = sha
918 return self._packed_refs
920 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
921 """Add the given refs as packed refs.
923 Args:
924 new_refs: A mapping of ref names to targets; if a target is None that
925 means remove the ref
926 """
927 if not new_refs:
928 return
930 path = os.path.join(self.path, b"packed-refs")
932 with GitFile(path, "wb") as f:
933 # reread cached refs from disk, while holding the lock
934 packed_refs = self.get_packed_refs().copy()
936 for ref, target in new_refs.items():
937 # sanity check
938 if ref == HEADREF:
939 raise ValueError("cannot pack HEAD")
941 # remove any loose refs pointing to this one -- please
942 # note that this bypasses remove_if_equals as we don't
943 # want to affect packed refs in here
944 with suppress(OSError):
945 os.remove(self.refpath(ref))
947 if target is not None:
948 packed_refs[ref] = target
949 else:
950 packed_refs.pop(ref, None)
952 write_packed_refs(f, packed_refs, self._peeled_refs)
954 self._packed_refs = packed_refs
956 def get_peeled(self, name: bytes) -> bytes | None:
957 """Return the cached peeled value of a ref, if available.
959 Args:
960 name: Name of the ref to peel
961 Returns: The peeled value of the ref. If the ref is known not point to
962 a tag, this will be the SHA the ref refers to. If the ref may point
963 to a tag, but no cached information is available, None is returned.
964 """
965 self.get_packed_refs()
966 if (
967 self._peeled_refs is None
968 or self._packed_refs is None
969 or name not in self._packed_refs
970 ):
971 # No cache: no peeled refs were read, or this ref is loose
972 return None
973 if name in self._peeled_refs:
974 return self._peeled_refs[name]
975 else:
976 # Known not peelable
977 return self[name]
979 def read_loose_ref(self, name: bytes) -> bytes | None:
980 """Read a reference file and return its contents.
982 If the reference file a symbolic reference, only read the first line of
983 the file. Otherwise, only read the first 40 bytes.
985 Args:
986 name: the refname to read, relative to refpath
987 Returns: The contents of the ref file, or None if the file does not
988 exist.
990 Raises:
991 IOError: if any other error occurs
992 """
993 filename = self.refpath(name)
994 try:
995 with GitFile(filename, "rb") as f:
996 header = f.read(len(SYMREF))
997 if header == SYMREF:
998 # Read only the first line
999 return header + next(iter(f)).rstrip(b"\r\n")
1000 else:
1001 # Read only the first 40 bytes
1002 return header + f.read(40 - len(SYMREF))
1003 except (OSError, UnicodeError):
1004 # don't assume anything specific about the error; in
1005 # particular, invalid or forbidden paths can raise weird
1006 # errors depending on the specific operating system
1007 return None
1009 def _remove_packed_ref(self, name: bytes) -> None:
1010 if self._packed_refs is None:
1011 return
1012 filename = os.path.join(self.path, b"packed-refs")
1013 # reread cached refs from disk, while holding the lock
1014 f = GitFile(filename, "wb")
1015 try:
1016 self._packed_refs = None
1017 self.get_packed_refs()
1019 if self._packed_refs is None or name not in self._packed_refs:
1020 f.abort()
1021 return
1023 del self._packed_refs[name]
1024 if self._peeled_refs is not None:
1025 with suppress(KeyError):
1026 del self._peeled_refs[name]
1027 write_packed_refs(f, self._packed_refs, self._peeled_refs)
1028 f.close()
1029 except BaseException:
1030 f.abort()
1031 raise
1033 def set_symbolic_ref(
1034 self,
1035 name: bytes,
1036 other: bytes,
1037 committer: bytes | None = None,
1038 timestamp: int | None = None,
1039 timezone: int | None = None,
1040 message: bytes | None = None,
1041 ) -> None:
1042 """Make a ref point at another ref.
1044 Args:
1045 name: Name of the ref to set
1046 other: Name of the ref to point at
1047 committer: Optional committer name
1048 timestamp: Optional timestamp
1049 timezone: Optional timezone
1050 message: Optional message to describe the change
1051 """
1052 self._check_refname(name)
1053 self._check_refname(other)
1054 filename = self.refpath(name)
1055 f = GitFile(filename, "wb")
1056 try:
1057 f.write(SYMREF + other + b"\n")
1058 sha = self.follow(name)[-1]
1059 self._log(
1060 name,
1061 sha,
1062 sha,
1063 committer=committer,
1064 timestamp=timestamp,
1065 timezone=timezone,
1066 message=message,
1067 )
1068 except BaseException:
1069 f.abort()
1070 raise
1071 else:
1072 f.close()
1074 def set_if_equals(
1075 self,
1076 name: bytes,
1077 old_ref: bytes | None,
1078 new_ref: bytes,
1079 committer: bytes | None = None,
1080 timestamp: int | None = None,
1081 timezone: int | None = None,
1082 message: bytes | None = None,
1083 ) -> bool:
1084 """Set a refname to new_ref only if it currently equals old_ref.
1086 This method follows all symbolic references, and can be used to perform
1087 an atomic compare-and-swap operation.
1089 Args:
1090 name: The refname to set.
1091 old_ref: The old sha the refname must refer to, or None to set
1092 unconditionally.
1093 new_ref: The new sha the refname will refer to.
1094 committer: Optional committer name
1095 timestamp: Optional timestamp
1096 timezone: Optional timezone
1097 message: Set message for reflog
1098 Returns: True if the set was successful, False otherwise.
1099 """
1100 self._check_refname(name)
1101 try:
1102 realnames, _ = self.follow(name)
1103 realname = realnames[-1]
1104 except (KeyError, IndexError, SymrefLoop):
1105 realname = name
1106 filename = self.refpath(realname)
1108 # make sure none of the ancestor folders is in packed refs
1109 probe_ref = os.path.dirname(realname)
1110 packed_refs = self.get_packed_refs()
1111 while probe_ref:
1112 if packed_refs.get(probe_ref, None) is not None:
1113 raise NotADirectoryError(filename)
1114 probe_ref = os.path.dirname(probe_ref)
1116 ensure_dir_exists(os.path.dirname(filename))
1117 with GitFile(filename, "wb") as f:
1118 if old_ref is not None:
1119 try:
1120 # read again while holding the lock to handle race conditions
1121 orig_ref = self.read_loose_ref(realname)
1122 if orig_ref is None:
1123 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
1124 if orig_ref != old_ref:
1125 f.abort()
1126 return False
1127 except OSError:
1128 f.abort()
1129 raise
1131 # Check if ref already has the desired value while holding the lock
1132 # This avoids fsync when ref is unchanged but still detects lock conflicts
1133 current_ref = self.read_loose_ref(realname)
1134 if current_ref is None:
1135 current_ref = packed_refs.get(realname, None)
1137 if current_ref is not None and current_ref == new_ref:
1138 # Ref already has desired value, abort write to avoid fsync
1139 f.abort()
1140 return True
1142 try:
1143 f.write(new_ref + b"\n")
1144 except OSError:
1145 f.abort()
1146 raise
1147 self._log(
1148 realname,
1149 old_ref,
1150 new_ref,
1151 committer=committer,
1152 timestamp=timestamp,
1153 timezone=timezone,
1154 message=message,
1155 )
1156 return True
1158 def add_if_new(
1159 self,
1160 name: bytes,
1161 ref: bytes,
1162 committer: bytes | None = None,
1163 timestamp: int | None = None,
1164 timezone: int | None = None,
1165 message: bytes | None = None,
1166 ) -> bool:
1167 """Add a new reference only if it does not already exist.
1169 This method follows symrefs, and only ensures that the last ref in the
1170 chain does not exist.
1172 Args:
1173 name: The refname to set.
1174 ref: The new sha the refname will refer to.
1175 committer: Optional committer name
1176 timestamp: Optional timestamp
1177 timezone: Optional timezone
1178 message: Optional message for reflog
1179 Returns: True if the add was successful, False otherwise.
1180 """
1181 try:
1182 realnames, contents = self.follow(name)
1183 if contents is not None:
1184 return False
1185 realname = realnames[-1]
1186 except (KeyError, IndexError):
1187 realname = name
1188 self._check_refname(realname)
1189 filename = self.refpath(realname)
1190 ensure_dir_exists(os.path.dirname(filename))
1191 with GitFile(filename, "wb") as f:
1192 if os.path.exists(filename) or name in self.get_packed_refs():
1193 f.abort()
1194 return False
1195 try:
1196 f.write(ref + b"\n")
1197 except OSError:
1198 f.abort()
1199 raise
1200 else:
1201 self._log(
1202 name,
1203 None,
1204 ref,
1205 committer=committer,
1206 timestamp=timestamp,
1207 timezone=timezone,
1208 message=message,
1209 )
1210 return True
1212 def remove_if_equals(
1213 self,
1214 name: bytes,
1215 old_ref: bytes | None,
1216 committer: bytes | None = None,
1217 timestamp: int | None = None,
1218 timezone: int | None = None,
1219 message: bytes | None = None,
1220 ) -> bool:
1221 """Remove a refname only if it currently equals old_ref.
1223 This method does not follow symbolic references. It can be used to
1224 perform an atomic compare-and-delete operation.
1226 Args:
1227 name: The refname to delete.
1228 old_ref: The old sha the refname must refer to, or None to
1229 delete unconditionally.
1230 committer: Optional committer name
1231 timestamp: Optional timestamp
1232 timezone: Optional timezone
1233 message: Optional message
1234 Returns: True if the delete was successful, False otherwise.
1235 """
1236 self._check_refname(name)
1237 filename = self.refpath(name)
1238 ensure_dir_exists(os.path.dirname(filename))
1239 f = GitFile(filename, "wb")
1240 try:
1241 if old_ref is not None:
1242 orig_ref = self.read_loose_ref(name)
1243 if orig_ref is None:
1244 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1245 if orig_ref != old_ref:
1246 return False
1248 # remove the reference file itself
1249 try:
1250 found = os.path.lexists(filename)
1251 except OSError:
1252 # may only be packed, or otherwise unstorable
1253 found = False
1255 if found:
1256 os.remove(filename)
1258 self._remove_packed_ref(name)
1259 self._log(
1260 name,
1261 old_ref,
1262 None,
1263 committer=committer,
1264 timestamp=timestamp,
1265 timezone=timezone,
1266 message=message,
1267 )
1268 finally:
1269 # never write, we just wanted the lock
1270 f.abort()
1272 # outside of the lock, clean-up any parent directory that might now
1273 # be empty. this ensures that re-creating a reference of the same
1274 # name of what was previously a directory works as expected
1275 parent = name
1276 while True:
1277 try:
1278 parent, _ = parent.rsplit(b"/", 1)
1279 except ValueError:
1280 break
1282 if parent == b"refs":
1283 break
1284 parent_filename = self.refpath(parent)
1285 try:
1286 os.rmdir(parent_filename)
1287 except OSError:
1288 # this can be caused by the parent directory being
1289 # removed by another process, being not empty, etc.
1290 # in any case, this is non fatal because we already
1291 # removed the reference, just ignore it
1292 break
1294 return True
1296 def pack_refs(self, all: bool = False) -> None:
1297 """Pack loose refs into packed-refs file.
1299 Args:
1300 all: If True, pack all refs. If False, only pack tags.
1301 """
1302 refs_to_pack: dict[Ref, ObjectID | None] = {}
1303 for ref in self.allkeys():
1304 if ref == HEADREF:
1305 # Never pack HEAD
1306 continue
1307 if all or ref.startswith(LOCAL_TAG_PREFIX):
1308 try:
1309 sha = self[ref]
1310 if sha:
1311 refs_to_pack[ref] = sha
1312 except KeyError:
1313 # Broken ref, skip it
1314 pass
1316 if refs_to_pack:
1317 self.add_packed_refs(refs_to_pack)
1320def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
1321 """Split a single ref line into a tuple of SHA1 and name."""
1322 fields = line.rstrip(b"\n\r").split(b" ")
1323 if len(fields) != 2:
1324 raise PackedRefsException(f"invalid ref line {line!r}")
1325 sha, name = fields
1326 if not valid_hexsha(sha):
1327 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1328 if not check_ref_format(name):
1329 raise PackedRefsException(f"invalid ref name {name!r}")
1330 return (sha, name)
1333def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
1334 """Read a packed refs file.
1336 Args:
1337 f: file-like object to read from
1338 Returns: Iterator over tuples with SHA1s and ref names.
1339 """
1340 for line in f:
1341 if line.startswith(b"#"):
1342 # Comment
1343 continue
1344 if line.startswith(b"^"):
1345 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1346 yield _split_ref_line(line)
1349def read_packed_refs_with_peeled(
1350 f: IO[bytes],
1351) -> Iterator[tuple[bytes, bytes, bytes | None]]:
1352 """Read a packed refs file including peeled refs.
1354 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1355 with ref names, SHA1s, and peeled SHA1s (or None).
1357 Args:
1358 f: file-like object to read from, seek'ed to the second line
1359 """
1360 last = None
1361 for line in f:
1362 if line.startswith(b"#"):
1363 continue
1364 line = line.rstrip(b"\r\n")
1365 if line.startswith(b"^"):
1366 if not last:
1367 raise PackedRefsException("unexpected peeled ref line")
1368 if not valid_hexsha(line[1:]):
1369 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1370 sha, name = _split_ref_line(last)
1371 last = None
1372 yield (sha, name, line[1:])
1373 else:
1374 if last:
1375 sha, name = _split_ref_line(last)
1376 yield (sha, name, None)
1377 last = line
1378 if last:
1379 sha, name = _split_ref_line(last)
1380 yield (sha, name, None)
1383def write_packed_refs(
1384 f: IO[bytes],
1385 packed_refs: Mapping[bytes, bytes],
1386 peeled_refs: Mapping[bytes, bytes] | None = None,
1387) -> None:
1388 """Write a packed refs file.
1390 Args:
1391 f: empty file-like object to write to
1392 packed_refs: dict of refname to sha of packed refs to write
1393 peeled_refs: dict of refname to peeled value of sha
1394 """
1395 if peeled_refs is None:
1396 peeled_refs = {}
1397 else:
1398 f.write(b"# pack-refs with: peeled\n")
1399 for refname in sorted(packed_refs.keys()):
1400 f.write(git_line(packed_refs[refname], refname))
1401 if refname in peeled_refs:
1402 f.write(b"^" + peeled_refs[refname] + b"\n")
1405def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
1406 """Read info/refs file.
1408 Args:
1409 f: File-like object to read from
1411 Returns:
1412 Dictionary mapping ref names to SHA1s
1413 """
1414 ret = {}
1415 for line in f.readlines():
1416 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1417 ret[name] = sha
1418 return ret
1421def write_info_refs(
1422 refs: Mapping[bytes, bytes], store: ObjectContainer
1423) -> Iterator[bytes]:
1424 """Generate info refs."""
1425 # TODO: Avoid recursive import :(
1426 from .object_store import peel_sha
1428 for name, sha in sorted(refs.items()):
1429 # get_refs() includes HEAD as a special case, but we don't want to
1430 # advertise it
1431 if name == HEADREF:
1432 continue
1433 try:
1434 o = store[sha]
1435 except KeyError:
1436 continue
1437 _unpeeled, peeled = peel_sha(store, sha)
1438 yield o.id + b"\t" + name + b"\n"
1439 if o.id != peeled.id:
1440 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1443def is_local_branch(x: bytes) -> bool:
1444 """Check if a ref name is a local branch."""
1445 return x.startswith(LOCAL_BRANCH_PREFIX)
1448def local_branch_name(name: bytes) -> bytes:
1449 """Build a full branch ref from a short name.
1451 Args:
1452 name: Short branch name (e.g., b"master") or full ref
1454 Returns:
1455 Full branch ref name (e.g., b"refs/heads/master")
1457 Examples:
1458 >>> local_branch_name(b"master")
1459 b'refs/heads/master'
1460 >>> local_branch_name(b"refs/heads/master")
1461 b'refs/heads/master'
1462 """
1463 if name.startswith(LOCAL_BRANCH_PREFIX):
1464 return name
1465 return LOCAL_BRANCH_PREFIX + name
1468def local_tag_name(name: bytes) -> bytes:
1469 """Build a full tag ref from a short name.
1471 Args:
1472 name: Short tag name (e.g., b"v1.0") or full ref
1474 Returns:
1475 Full tag ref name (e.g., b"refs/tags/v1.0")
1477 Examples:
1478 >>> local_tag_name(b"v1.0")
1479 b'refs/tags/v1.0'
1480 >>> local_tag_name(b"refs/tags/v1.0")
1481 b'refs/tags/v1.0'
1482 """
1483 if name.startswith(LOCAL_TAG_PREFIX):
1484 return name
1485 return LOCAL_TAG_PREFIX + name
1488def local_replace_name(name: bytes) -> bytes:
1489 """Build a full replace ref from a short name.
1491 Args:
1492 name: Short replace name (object SHA) or full ref
1494 Returns:
1495 Full replace ref name (e.g., b"refs/replace/<sha>")
1497 Examples:
1498 >>> local_replace_name(b"abc123")
1499 b'refs/replace/abc123'
1500 >>> local_replace_name(b"refs/replace/abc123")
1501 b'refs/replace/abc123'
1502 """
1503 if name.startswith(LOCAL_REPLACE_PREFIX):
1504 return name
1505 return LOCAL_REPLACE_PREFIX + name
1508def extract_branch_name(ref: bytes) -> bytes:
1509 """Extract branch name from a full branch ref.
1511 Args:
1512 ref: Full branch ref (e.g., b"refs/heads/master")
1514 Returns:
1515 Short branch name (e.g., b"master")
1517 Raises:
1518 ValueError: If ref is not a local branch
1520 Examples:
1521 >>> extract_branch_name(b"refs/heads/master")
1522 b'master'
1523 >>> extract_branch_name(b"refs/heads/feature/foo")
1524 b'feature/foo'
1525 """
1526 if not ref.startswith(LOCAL_BRANCH_PREFIX):
1527 raise ValueError(f"Not a local branch ref: {ref!r}")
1528 return ref[len(LOCAL_BRANCH_PREFIX) :]
1531def extract_tag_name(ref: bytes) -> bytes:
1532 """Extract tag name from a full tag ref.
1534 Args:
1535 ref: Full tag ref (e.g., b"refs/tags/v1.0")
1537 Returns:
1538 Short tag name (e.g., b"v1.0")
1540 Raises:
1541 ValueError: If ref is not a local tag
1543 Examples:
1544 >>> extract_tag_name(b"refs/tags/v1.0")
1545 b'v1.0'
1546 """
1547 if not ref.startswith(LOCAL_TAG_PREFIX):
1548 raise ValueError(f"Not a local tag ref: {ref!r}")
1549 return ref[len(LOCAL_TAG_PREFIX) :]
1552def shorten_ref_name(ref: bytes) -> bytes:
1553 """Convert a full ref name to its short form.
1555 Args:
1556 ref: Full ref name (e.g., b"refs/heads/master")
1558 Returns:
1559 Short ref name (e.g., b"master")
1561 Examples:
1562 >>> shorten_ref_name(b"refs/heads/master")
1563 b'master'
1564 >>> shorten_ref_name(b"refs/remotes/origin/main")
1565 b'origin/main'
1566 >>> shorten_ref_name(b"refs/tags/v1.0")
1567 b'v1.0'
1568 >>> shorten_ref_name(b"HEAD")
1569 b'HEAD'
1570 """
1571 if ref.startswith(LOCAL_BRANCH_PREFIX):
1572 return ref[len(LOCAL_BRANCH_PREFIX) :]
1573 elif ref.startswith(LOCAL_REMOTE_PREFIX):
1574 return ref[len(LOCAL_REMOTE_PREFIX) :]
1575 elif ref.startswith(LOCAL_TAG_PREFIX):
1576 return ref[len(LOCAL_TAG_PREFIX) :]
1577 return ref
1580T = TypeVar("T", dict[bytes, bytes], dict[bytes, bytes | None])
1583def strip_peeled_refs(refs: T) -> T:
1584 """Remove all peeled refs."""
1585 return {
1586 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1587 }
1590def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
1591 """Split peeled refs from regular refs."""
1592 peeled: dict[bytes, bytes] = {}
1593 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
1595 for ref, sha in refs.items():
1596 if ref.endswith(PEELED_TAG_SUFFIX):
1597 # Only add to peeled dict if sha is not None
1598 if sha is not None:
1599 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1601 return regular, peeled
1604def _set_origin_head(
1605 refs: RefsContainer, origin: bytes, origin_head: bytes | None
1606) -> None:
1607 # set refs/remotes/origin/HEAD
1608 origin_base = b"refs/remotes/" + origin + b"/"
1609 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1610 origin_ref = origin_base + HEADREF
1611 target_ref = origin_base + extract_branch_name(origin_head)
1612 if target_ref in refs:
1613 refs.set_symbolic_ref(origin_ref, target_ref)
1616def _set_default_branch(
1617 refs: RefsContainer,
1618 origin: bytes,
1619 origin_head: bytes | None,
1620 branch: bytes | None,
1621 ref_message: bytes | None,
1622) -> bytes:
1623 """Set the default branch."""
1624 origin_base = b"refs/remotes/" + origin + b"/"
1625 if branch:
1626 origin_ref = origin_base + branch
1627 if origin_ref in refs:
1628 local_ref = local_branch_name(branch)
1629 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1630 head_ref = local_ref
1631 elif local_tag_name(branch) in refs:
1632 head_ref = local_tag_name(branch)
1633 else:
1634 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1635 elif origin_head:
1636 head_ref = origin_head
1637 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1638 origin_ref = origin_base + extract_branch_name(origin_head)
1639 else:
1640 origin_ref = origin_head
1641 try:
1642 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1643 except KeyError:
1644 pass
1645 else:
1646 raise ValueError("neither origin_head nor branch are provided")
1647 return head_ref
1650def _set_head(
1651 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None
1652) -> bytes | None:
1653 if head_ref.startswith(LOCAL_TAG_PREFIX):
1654 # detach HEAD at specified tag
1655 head = refs[head_ref]
1656 if isinstance(head, Tag):
1657 _cls, obj = head.object
1658 head = obj.get_object(obj).id
1659 del refs[HEADREF]
1660 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1661 else:
1662 # set HEAD to specific branch
1663 try:
1664 head = refs[head_ref]
1665 refs.set_symbolic_ref(HEADREF, head_ref)
1666 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1667 except KeyError:
1668 head = None
1669 return head
1672def _import_remote_refs(
1673 refs_container: RefsContainer,
1674 remote_name: str,
1675 refs: dict[bytes, bytes | None],
1676 message: bytes | None = None,
1677 prune: bool = False,
1678 prune_tags: bool = False,
1679) -> None:
1680 stripped_refs = strip_peeled_refs(refs)
1681 branches = {
1682 extract_branch_name(n): v
1683 for (n, v) in stripped_refs.items()
1684 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
1685 }
1686 refs_container.import_refs(
1687 b"refs/remotes/" + remote_name.encode(),
1688 branches,
1689 message=message,
1690 prune=prune,
1691 )
1692 tags = {
1693 extract_tag_name(n): v
1694 for (n, v) in stripped_refs.items()
1695 if n.startswith(LOCAL_TAG_PREFIX)
1696 and not n.endswith(PEELED_TAG_SUFFIX)
1697 and v is not None
1698 }
1699 refs_container.import_refs(
1700 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1701 )
1704def serialize_refs(
1705 store: ObjectContainer, refs: Mapping[bytes, bytes]
1706) -> dict[bytes, bytes]:
1707 """Serialize refs with peeled refs.
1709 Args:
1710 store: Object store to peel refs from
1711 refs: Dictionary of ref names to SHAs
1713 Returns:
1714 Dictionary with refs and peeled refs (marked with ^{})
1715 """
1716 # TODO: Avoid recursive import :(
1717 from .object_store import peel_sha
1719 ret = {}
1720 for ref, sha in refs.items():
1721 try:
1722 unpeeled, peeled = peel_sha(store, sha)
1723 except KeyError:
1724 warnings.warn(
1725 "ref {} points at non-present sha {}".format(
1726 ref.decode("utf-8", "replace"), sha.decode("ascii")
1727 ),
1728 UserWarning,
1729 )
1730 continue
1731 else:
1732 if isinstance(unpeeled, Tag):
1733 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1734 ret[ref] = unpeeled.id
1735 return ret
1738class locked_ref:
1739 """Lock a ref while making modifications.
1741 Works as a context manager.
1742 """
1744 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1745 """Initialize a locked ref.
1747 Args:
1748 refs_container: The DiskRefsContainer to lock the ref in
1749 refname: The ref name to lock
1750 """
1751 self._refs_container = refs_container
1752 self._refname = refname
1753 self._file: _GitFile | None = None
1754 self._realname: Ref | None = None
1755 self._deleted = False
1757 def __enter__(self) -> "locked_ref":
1758 """Enter the context manager and acquire the lock.
1760 Returns:
1761 This locked_ref instance
1763 Raises:
1764 OSError: If the lock cannot be acquired
1765 """
1766 self._refs_container._check_refname(self._refname)
1767 try:
1768 realnames, _ = self._refs_container.follow(self._refname)
1769 self._realname = realnames[-1]
1770 except (KeyError, IndexError, SymrefLoop):
1771 self._realname = self._refname
1773 filename = self._refs_container.refpath(self._realname)
1774 ensure_dir_exists(os.path.dirname(filename))
1775 f = GitFile(filename, "wb")
1776 self._file = f
1777 return self
1779 def __exit__(
1780 self,
1781 exc_type: type | None,
1782 exc_value: BaseException | None,
1783 traceback: types.TracebackType | None,
1784 ) -> None:
1785 """Exit the context manager and release the lock.
1787 Args:
1788 exc_type: Type of exception if one occurred
1789 exc_value: Exception instance if one occurred
1790 traceback: Traceback if an exception occurred
1791 """
1792 if self._file:
1793 if exc_type is not None or self._deleted:
1794 self._file.abort()
1795 else:
1796 self._file.close()
1798 def get(self) -> bytes | None:
1799 """Get the current value of the ref."""
1800 if not self._file:
1801 raise RuntimeError("locked_ref not in context")
1803 assert self._realname is not None
1804 current_ref = self._refs_container.read_loose_ref(self._realname)
1805 if current_ref is None:
1806 current_ref = self._refs_container.get_packed_refs().get(
1807 self._realname, None
1808 )
1809 return current_ref
1811 def ensure_equals(self, expected_value: bytes | None) -> bool:
1812 """Ensure the ref currently equals the expected value.
1814 Args:
1815 expected_value: The expected current value of the ref
1816 Returns:
1817 True if the ref equals the expected value, False otherwise
1818 """
1819 current_value = self.get()
1820 return current_value == expected_value
1822 def set(self, new_ref: bytes) -> None:
1823 """Set the ref to a new value.
1825 Args:
1826 new_ref: The new SHA1 or symbolic ref value
1827 """
1828 if not self._file:
1829 raise RuntimeError("locked_ref not in context")
1831 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1832 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1834 self._file.seek(0)
1835 self._file.truncate()
1836 self._file.write(new_ref + b"\n")
1837 self._deleted = False
1839 def set_symbolic_ref(self, target: Ref) -> None:
1840 """Make this ref point at another ref.
1842 Args:
1843 target: Name of the ref to point at
1844 """
1845 if not self._file:
1846 raise RuntimeError("locked_ref not in context")
1848 self._refs_container._check_refname(target)
1849 self._file.seek(0)
1850 self._file.truncate()
1851 self._file.write(SYMREF + target + b"\n")
1852 self._deleted = False
1854 def delete(self) -> None:
1855 """Delete the ref file while holding the lock."""
1856 if not self._file:
1857 raise RuntimeError("locked_ref not in context")
1859 # Delete the actual ref file while holding the lock
1860 if self._realname:
1861 filename = self._refs_container.refpath(self._realname)
1862 try:
1863 if os.path.lexists(filename):
1864 os.remove(filename)
1865 except FileNotFoundError:
1866 pass
1867 self._refs_container._remove_packed_ref(self._realname)
1869 self._deleted = True
1872class NamespacedRefsContainer(RefsContainer):
1873 """Wrapper that adds namespace prefix to all ref operations.
1875 This implements Git's GIT_NAMESPACE feature, which stores refs under
1876 refs/namespaces/<namespace>/ and filters operations to only show refs
1877 within that namespace.
1879 Example:
1880 With namespace "foo", a ref "refs/heads/master" is stored as
1881 "refs/namespaces/foo/refs/heads/master" in the underlying container.
1882 """
1884 def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
1885 """Initialize NamespacedRefsContainer.
1887 Args:
1888 refs: The underlying refs container to wrap
1889 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
1890 """
1891 super().__init__(logger=refs._logger)
1892 self._refs = refs
1893 # Build namespace prefix: refs/namespaces/<namespace>/
1894 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
1895 namespace_parts = namespace.split(b"/")
1896 self._namespace_prefix = b""
1897 for part in namespace_parts:
1898 self._namespace_prefix += b"refs/namespaces/" + part + b"/"
1900 def _apply_namespace(self, name: bytes) -> bytes:
1901 """Apply namespace prefix to a ref name."""
1902 # HEAD and other special refs are not namespaced
1903 if name == HEADREF or not name.startswith(b"refs/"):
1904 return name
1905 return self._namespace_prefix + name
1907 def _strip_namespace(self, name: bytes) -> bytes | None:
1908 """Remove namespace prefix from a ref name.
1910 Returns None if the ref is not in our namespace.
1911 """
1912 # HEAD and other special refs are not namespaced
1913 if name == HEADREF or not name.startswith(b"refs/"):
1914 return name
1915 if name.startswith(self._namespace_prefix):
1916 return name[len(self._namespace_prefix) :]
1917 return None
1919 def allkeys(self) -> set[bytes]:
1920 """Return all reference keys in this namespace."""
1921 keys = set()
1922 for key in self._refs.allkeys():
1923 stripped = self._strip_namespace(key)
1924 if stripped is not None:
1925 keys.add(stripped)
1926 return keys
1928 def read_loose_ref(self, name: bytes) -> bytes | None:
1929 """Read a loose reference."""
1930 return self._refs.read_loose_ref(self._apply_namespace(name))
1932 def get_packed_refs(self) -> dict[Ref, ObjectID]:
1933 """Get packed refs within this namespace."""
1934 packed = {}
1935 for name, value in self._refs.get_packed_refs().items():
1936 stripped = self._strip_namespace(name)
1937 if stripped is not None:
1938 packed[stripped] = value
1939 return packed
1941 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
1942 """Add packed refs with namespace prefix."""
1943 namespaced_refs = {
1944 self._apply_namespace(name): value for name, value in new_refs.items()
1945 }
1946 self._refs.add_packed_refs(namespaced_refs)
1948 def get_peeled(self, name: bytes) -> ObjectID | None:
1949 """Return the cached peeled value of a ref."""
1950 return self._refs.get_peeled(self._apply_namespace(name))
1952 def set_symbolic_ref(
1953 self,
1954 name: bytes,
1955 other: bytes,
1956 committer: bytes | None = None,
1957 timestamp: int | None = None,
1958 timezone: int | None = None,
1959 message: bytes | None = None,
1960 ) -> None:
1961 """Make a ref point at another ref."""
1962 self._refs.set_symbolic_ref(
1963 self._apply_namespace(name),
1964 self._apply_namespace(other),
1965 committer=committer,
1966 timestamp=timestamp,
1967 timezone=timezone,
1968 message=message,
1969 )
1971 def set_if_equals(
1972 self,
1973 name: bytes,
1974 old_ref: bytes | None,
1975 new_ref: bytes,
1976 committer: bytes | None = None,
1977 timestamp: int | None = None,
1978 timezone: int | None = None,
1979 message: bytes | None = None,
1980 ) -> bool:
1981 """Set a refname to new_ref only if it currently equals old_ref."""
1982 return self._refs.set_if_equals(
1983 self._apply_namespace(name),
1984 old_ref,
1985 new_ref,
1986 committer=committer,
1987 timestamp=timestamp,
1988 timezone=timezone,
1989 message=message,
1990 )
1992 def add_if_new(
1993 self,
1994 name: bytes,
1995 ref: bytes,
1996 committer: bytes | None = None,
1997 timestamp: int | None = None,
1998 timezone: int | None = None,
1999 message: bytes | None = None,
2000 ) -> bool:
2001 """Add a new reference only if it does not already exist."""
2002 return self._refs.add_if_new(
2003 self._apply_namespace(name),
2004 ref,
2005 committer=committer,
2006 timestamp=timestamp,
2007 timezone=timezone,
2008 message=message,
2009 )
2011 def remove_if_equals(
2012 self,
2013 name: bytes,
2014 old_ref: bytes | None,
2015 committer: bytes | None = None,
2016 timestamp: int | None = None,
2017 timezone: int | None = None,
2018 message: bytes | None = None,
2019 ) -> bool:
2020 """Remove a refname only if it currently equals old_ref."""
2021 return self._refs.remove_if_equals(
2022 self._apply_namespace(name),
2023 old_ref,
2024 committer=committer,
2025 timestamp=timestamp,
2026 timezone=timezone,
2027 message=message,
2028 )
2030 def pack_refs(self, all: bool = False) -> None:
2031 """Pack loose refs into packed-refs file.
2033 Note: This packs all refs in the underlying container, not just
2034 those in the namespace.
2035 """
2036 self._refs.pack_refs(all=all)
2039def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
2040 """Filter refs to only include those with a given prefix.
2042 Args:
2043 refs: A dictionary of refs.
2044 prefixes: The prefixes to filter by.
2045 """
2046 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
2047 return filtered
2050def is_per_worktree_ref(ref: bytes) -> bool:
2051 """Returns whether a reference is stored per worktree or not.
2053 Per-worktree references are:
2054 - all pseudorefs, e.g. HEAD
2055 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
2057 All refs starting with "refs/" are shared, except for the ones listed above.
2059 See https://git-scm.com/docs/git-worktree#_refs.
2060 """
2061 return not ref.startswith(b"refs/") or ref.startswith(
2062 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
2063 )