Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import types
27import warnings
28from collections.abc import Iterator
29from contextlib import suppress
30from typing import TYPE_CHECKING, Any, Optional, Union
32if TYPE_CHECKING:
33 from .file import _GitFile
35from .errors import PackedRefsException, RefFormatError
36from .file import GitFile, ensure_dir_exists
37from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
38from .pack import ObjectContainer
40Ref = bytes
42HEADREF = b"HEAD"
43SYMREF = b"ref: "
44LOCAL_BRANCH_PREFIX = b"refs/heads/"
45LOCAL_TAG_PREFIX = b"refs/tags/"
46LOCAL_REMOTE_PREFIX = b"refs/remotes/"
47LOCAL_NOTES_PREFIX = b"refs/notes/"
48BAD_REF_CHARS = set(b"\177 ~^:?*[")
49PEELED_TAG_SUFFIX = b"^{}"
51# For backwards compatibility
52ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
55class SymrefLoop(Exception):
56 """There is a loop between one or more symrefs."""
58 def __init__(self, ref, depth) -> None:
59 self.ref = ref
60 self.depth = depth
63def parse_symref_value(contents: bytes) -> bytes:
64 """Parse a symref value.
66 Args:
67 contents: Contents to parse
68 Returns: Destination
69 """
70 if contents.startswith(SYMREF):
71 return contents[len(SYMREF) :].rstrip(b"\r\n")
72 raise ValueError(contents)
75def check_ref_format(refname: Ref) -> bool:
76 """Check if a refname is correctly formatted.
78 Implements all the same rules as git-check-ref-format[1].
80 [1]
81 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
83 Args:
84 refname: The refname to check
85 Returns: True if refname is valid, False otherwise
86 """
87 # These could be combined into one big expression, but are listed
88 # separately to parallel [1].
89 if b"/." in refname or refname.startswith(b"."):
90 return False
91 if b"/" not in refname:
92 return False
93 if b".." in refname:
94 return False
95 for i, c in enumerate(refname):
96 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
97 return False
98 if refname[-1] in b"/.":
99 return False
100 if refname.endswith(b".lock"):
101 return False
102 if b"@{" in refname:
103 return False
104 if b"\\" in refname:
105 return False
106 return True
109def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
110 """Parse a remote ref into remote name and branch name.
112 Args:
113 ref: Remote ref like b"refs/remotes/origin/main"
115 Returns:
116 Tuple of (remote_name, branch_name)
118 Raises:
119 ValueError: If ref is not a valid remote ref
120 """
121 if not ref.startswith(LOCAL_REMOTE_PREFIX):
122 raise ValueError(f"Not a remote ref: {ref!r}")
124 # Remove the prefix
125 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
127 # Split into remote name and branch name
128 parts = remainder.split(b"/", 1)
129 if len(parts) != 2:
130 raise ValueError(f"Invalid remote ref format: {ref!r}")
132 remote_name, branch_name = parts
133 return (remote_name, branch_name)
136class RefsContainer:
137 """A container for refs."""
139 def __init__(self, logger=None) -> None:
140 self._logger = logger
142 def _log(
143 self,
144 ref,
145 old_sha,
146 new_sha,
147 committer=None,
148 timestamp=None,
149 timezone=None,
150 message=None,
151 ) -> None:
152 if self._logger is None:
153 return
154 if message is None:
155 return
156 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
158 def set_symbolic_ref(
159 self,
160 name,
161 other,
162 committer=None,
163 timestamp=None,
164 timezone=None,
165 message=None,
166 ) -> None:
167 """Make a ref point at another ref.
169 Args:
170 name: Name of the ref to set
171 other: Name of the ref to point at
172 message: Optional message
173 """
174 raise NotImplementedError(self.set_symbolic_ref)
176 def get_packed_refs(self) -> dict[Ref, ObjectID]:
177 """Get contents of the packed-refs file.
179 Returns: Dictionary mapping ref names to SHA1s
181 Note: Will return an empty dictionary when no packed-refs file is
182 present.
183 """
184 raise NotImplementedError(self.get_packed_refs)
186 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
187 """Add the given refs as packed refs.
189 Args:
190 new_refs: A mapping of ref names to targets; if a target is None that
191 means remove the ref
192 """
193 raise NotImplementedError(self.add_packed_refs)
195 def get_peeled(self, name) -> Optional[ObjectID]:
196 """Return the cached peeled value of a ref, if available.
198 Args:
199 name: Name of the ref to peel
200 Returns: The peeled value of the ref. If the ref is known not point to
201 a tag, this will be the SHA the ref refers to. If the ref may point
202 to a tag, but no cached information is available, None is returned.
203 """
204 return None
206 def import_refs(
207 self,
208 base: Ref,
209 other: dict[Ref, ObjectID],
210 committer: Optional[bytes] = None,
211 timestamp: Optional[bytes] = None,
212 timezone: Optional[bytes] = None,
213 message: Optional[bytes] = None,
214 prune: bool = False,
215 ) -> None:
216 if prune:
217 to_delete = set(self.subkeys(base))
218 else:
219 to_delete = set()
220 for name, value in other.items():
221 if value is None:
222 to_delete.add(name)
223 else:
224 self.set_if_equals(
225 b"/".join((base, name)), None, value, message=message
226 )
227 if to_delete:
228 try:
229 to_delete.remove(name)
230 except KeyError:
231 pass
232 for ref in to_delete:
233 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
235 def allkeys(self) -> Iterator[Ref]:
236 """All refs present in this container."""
237 raise NotImplementedError(self.allkeys)
239 def __iter__(self):
240 return iter(self.allkeys())
242 def keys(self, base=None):
243 """Refs present in this container.
245 Args:
246 base: An optional base to return refs under.
247 Returns: An unsorted set of valid refs in this container, including
248 packed refs.
249 """
250 if base is not None:
251 return self.subkeys(base)
252 else:
253 return self.allkeys()
255 def subkeys(self, base):
256 """Refs present in this container under a base.
258 Args:
259 base: The base to return refs under.
260 Returns: A set of valid refs in this container under the base; the base
261 prefix is stripped from the ref names returned.
262 """
263 keys = set()
264 base_len = len(base) + 1
265 for refname in self.allkeys():
266 if refname.startswith(base):
267 keys.add(refname[base_len:])
268 return keys
270 def as_dict(self, base=None) -> dict[Ref, ObjectID]:
271 """Return the contents of this container as a dictionary."""
272 ret = {}
273 keys = self.keys(base)
274 if base is None:
275 base = b""
276 else:
277 base = base.rstrip(b"/")
278 for key in keys:
279 try:
280 ret[key] = self[(base + b"/" + key).strip(b"/")]
281 except (SymrefLoop, KeyError):
282 continue # Unable to resolve
284 return ret
286 def _check_refname(self, name) -> None:
287 """Ensure a refname is valid and lives in refs or is HEAD.
289 HEAD is not a valid refname according to git-check-ref-format, but this
290 class needs to be able to touch HEAD. Also, check_ref_format expects
291 refnames without the leading 'refs/', but this class requires that
292 so it cannot touch anything outside the refs dir (or HEAD).
294 Args:
295 name: The name of the reference.
297 Raises:
298 KeyError: if a refname is not HEAD or is otherwise not valid.
299 """
300 if name in (HEADREF, b"refs/stash"):
301 return
302 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
303 raise RefFormatError(name)
305 def read_ref(self, refname):
306 """Read a reference without following any references.
308 Args:
309 refname: The name of the reference
310 Returns: The contents of the ref file, or None if it does
311 not exist.
312 """
313 contents = self.read_loose_ref(refname)
314 if not contents:
315 contents = self.get_packed_refs().get(refname, None)
316 return contents
318 def read_loose_ref(self, name) -> bytes:
319 """Read a loose reference and return its contents.
321 Args:
322 name: the refname to read
323 Returns: The contents of the ref file, or None if it does
324 not exist.
325 """
326 raise NotImplementedError(self.read_loose_ref)
328 def follow(self, name) -> tuple[list[bytes], bytes]:
329 """Follow a reference name.
331 Returns: a tuple of (refnames, sha), wheres refnames are the names of
332 references in the chain
333 """
334 contents = SYMREF + name
335 depth = 0
336 refnames = []
337 while contents.startswith(SYMREF):
338 refname = contents[len(SYMREF) :]
339 refnames.append(refname)
340 contents = self.read_ref(refname)
341 if not contents:
342 break
343 depth += 1
344 if depth > 5:
345 raise SymrefLoop(name, depth)
346 return refnames, contents
348 def __contains__(self, refname) -> bool:
349 if self.read_ref(refname):
350 return True
351 return False
353 def __getitem__(self, name) -> ObjectID:
354 """Get the SHA1 for a reference name.
356 This method follows all symbolic references.
357 """
358 _, sha = self.follow(name)
359 if sha is None:
360 raise KeyError(name)
361 return sha
363 def set_if_equals(
364 self,
365 name,
366 old_ref,
367 new_ref,
368 committer=None,
369 timestamp=None,
370 timezone=None,
371 message=None,
372 ) -> bool:
373 """Set a refname to new_ref only if it currently equals old_ref.
375 This method follows all symbolic references if applicable for the
376 subclass, and can be used to perform an atomic compare-and-swap
377 operation.
379 Args:
380 name: The refname to set.
381 old_ref: The old sha the refname must refer to, or None to set
382 unconditionally.
383 new_ref: The new sha the refname will refer to.
384 message: Message for reflog
385 Returns: True if the set was successful, False otherwise.
386 """
387 raise NotImplementedError(self.set_if_equals)
389 def add_if_new(
390 self, name, ref, committer=None, timestamp=None, timezone=None, message=None
391 ) -> bool:
392 """Add a new reference only if it does not already exist.
394 Args:
395 name: Ref name
396 ref: Ref value
397 """
398 raise NotImplementedError(self.add_if_new)
400 def __setitem__(self, name, ref) -> None:
401 """Set a reference name to point to the given SHA1.
403 This method follows all symbolic references if applicable for the
404 subclass.
406 Note: This method unconditionally overwrites the contents of a
407 reference. To update atomically only if the reference has not
408 changed, use set_if_equals().
410 Args:
411 name: The refname to set.
412 ref: The new sha the refname will refer to.
413 """
414 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
415 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
416 self.set_if_equals(name, None, ref)
418 def remove_if_equals(
419 self,
420 name,
421 old_ref,
422 committer=None,
423 timestamp=None,
424 timezone=None,
425 message=None,
426 ) -> bool:
427 """Remove a refname only if it currently equals old_ref.
429 This method does not follow symbolic references, even if applicable for
430 the subclass. It can be used to perform an atomic compare-and-delete
431 operation.
433 Args:
434 name: The refname to delete.
435 old_ref: The old sha the refname must refer to, or None to
436 delete unconditionally.
437 message: Message for reflog
438 Returns: True if the delete was successful, False otherwise.
439 """
440 raise NotImplementedError(self.remove_if_equals)
442 def __delitem__(self, name) -> None:
443 """Remove a refname.
445 This method does not follow symbolic references, even if applicable for
446 the subclass.
448 Note: This method unconditionally deletes the contents of a reference.
449 To delete atomically only if the reference has not changed, use
450 remove_if_equals().
452 Args:
453 name: The refname to delete.
454 """
455 self.remove_if_equals(name, None)
457 def get_symrefs(self):
458 """Get a dict with all symrefs in this container.
460 Returns: Dictionary mapping source ref to target ref
461 """
462 ret = {}
463 for src in self.allkeys():
464 try:
465 dst = parse_symref_value(self.read_ref(src))
466 except ValueError:
467 pass
468 else:
469 ret[src] = dst
470 return ret
472 def pack_refs(self, all: bool = False) -> None:
473 """Pack loose refs into packed-refs file.
475 Args:
476 all: If True, pack all refs. If False, only pack tags.
477 """
478 raise NotImplementedError(self.pack_refs)
481class DictRefsContainer(RefsContainer):
482 """RefsContainer backed by a simple dict.
484 This container does not support symbolic or packed references and is not
485 threadsafe.
486 """
488 def __init__(self, refs, logger=None) -> None:
489 super().__init__(logger=logger)
490 self._refs = refs
491 self._peeled: dict[bytes, ObjectID] = {}
492 self._watchers: set[Any] = set()
494 def allkeys(self):
495 return self._refs.keys()
497 def read_loose_ref(self, name):
498 return self._refs.get(name, None)
500 def get_packed_refs(self):
501 return {}
503 def _notify(self, ref, newsha) -> None:
504 for watcher in self._watchers:
505 watcher._notify((ref, newsha))
507 def set_symbolic_ref(
508 self,
509 name: Ref,
510 other: Ref,
511 committer=None,
512 timestamp=None,
513 timezone=None,
514 message=None,
515 ) -> None:
516 old = self.follow(name)[-1]
517 new = SYMREF + other
518 self._refs[name] = new
519 self._notify(name, new)
520 self._log(
521 name,
522 old,
523 new,
524 committer=committer,
525 timestamp=timestamp,
526 timezone=timezone,
527 message=message,
528 )
530 def set_if_equals(
531 self,
532 name,
533 old_ref,
534 new_ref,
535 committer=None,
536 timestamp=None,
537 timezone=None,
538 message=None,
539 ) -> bool:
540 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
541 return False
542 # Only update the specific ref requested, not the whole chain
543 self._check_refname(name)
544 old = self._refs.get(name)
545 self._refs[name] = new_ref
546 self._notify(name, new_ref)
547 self._log(
548 name,
549 old,
550 new_ref,
551 committer=committer,
552 timestamp=timestamp,
553 timezone=timezone,
554 message=message,
555 )
556 return True
558 def add_if_new(
559 self,
560 name: Ref,
561 ref: ObjectID,
562 committer=None,
563 timestamp=None,
564 timezone=None,
565 message: Optional[bytes] = None,
566 ) -> bool:
567 if name in self._refs:
568 return False
569 self._refs[name] = ref
570 self._notify(name, ref)
571 self._log(
572 name,
573 None,
574 ref,
575 committer=committer,
576 timestamp=timestamp,
577 timezone=timezone,
578 message=message,
579 )
580 return True
582 def remove_if_equals(
583 self,
584 name,
585 old_ref,
586 committer=None,
587 timestamp=None,
588 timezone=None,
589 message=None,
590 ) -> bool:
591 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
592 return False
593 try:
594 old = self._refs.pop(name)
595 except KeyError:
596 pass
597 else:
598 self._notify(name, None)
599 self._log(
600 name,
601 old,
602 None,
603 committer=committer,
604 timestamp=timestamp,
605 timezone=timezone,
606 message=message,
607 )
608 return True
610 def get_peeled(self, name):
611 return self._peeled.get(name)
613 def _update(self, refs) -> None:
614 """Update multiple refs; intended only for testing."""
615 # TODO(dborowitz): replace this with a public function that uses
616 # set_if_equal.
617 for ref, sha in refs.items():
618 self.set_if_equals(ref, None, sha)
620 def _update_peeled(self, peeled) -> None:
621 """Update cached peeled refs; intended only for testing."""
622 self._peeled.update(peeled)
625class InfoRefsContainer(RefsContainer):
626 """Refs container that reads refs from a info/refs file."""
628 def __init__(self, f) -> None:
629 self._refs = {}
630 self._peeled = {}
631 refs = read_info_refs(f)
632 (self._refs, self._peeled) = split_peeled_refs(refs)
634 def allkeys(self):
635 return self._refs.keys()
637 def read_loose_ref(self, name):
638 return self._refs.get(name, None)
640 def get_packed_refs(self):
641 return {}
643 def get_peeled(self, name):
644 try:
645 return self._peeled[name]
646 except KeyError:
647 return self._refs[name]
650class DiskRefsContainer(RefsContainer):
651 """Refs container that reads refs from disk."""
653 def __init__(
654 self,
655 path: Union[str, bytes, os.PathLike],
656 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
657 logger=None,
658 ) -> None:
659 super().__init__(logger=logger)
660 # Convert path-like objects to strings, then to bytes for Git compatibility
661 self.path = os.fsencode(os.fspath(path))
662 if worktree_path is None:
663 self.worktree_path = self.path
664 else:
665 self.worktree_path = os.fsencode(os.fspath(worktree_path))
666 self._packed_refs = None
667 self._peeled_refs = None
669 def __repr__(self) -> str:
670 return f"{self.__class__.__name__}({self.path!r})"
672 def subkeys(self, base):
673 subkeys = set()
674 path = self.refpath(base)
675 for root, unused_dirs, files in os.walk(path):
676 directory = root[len(path) :]
677 if os.path.sep != "/":
678 directory = directory.replace(os.fsencode(os.path.sep), b"/")
679 directory = directory.strip(b"/")
680 for filename in files:
681 refname = b"/".join(([directory] if directory else []) + [filename])
682 # check_ref_format requires at least one /, so we prepend the
683 # base before calling it.
684 if check_ref_format(base + b"/" + refname):
685 subkeys.add(refname)
686 for key in self.get_packed_refs():
687 if key.startswith(base):
688 subkeys.add(key[len(base) :].strip(b"/"))
689 return subkeys
691 def allkeys(self):
692 allkeys = set()
693 if os.path.exists(self.refpath(HEADREF)):
694 allkeys.add(HEADREF)
695 path = self.refpath(b"")
696 refspath = self.refpath(b"refs")
697 for root, unused_dirs, files in os.walk(refspath):
698 directory = root[len(path) :]
699 if os.path.sep != "/":
700 directory = directory.replace(os.fsencode(os.path.sep), b"/")
701 for filename in files:
702 refname = b"/".join([directory, filename])
703 if check_ref_format(refname):
704 allkeys.add(refname)
705 allkeys.update(self.get_packed_refs())
706 return allkeys
708 def refpath(self, name):
709 """Return the disk path of a ref."""
710 if os.path.sep != "/":
711 name = name.replace(b"/", os.fsencode(os.path.sep))
712 # TODO: as the 'HEAD' reference is working tree specific, it
713 # should actually not be a part of RefsContainer
714 if name == HEADREF:
715 return os.path.join(self.worktree_path, name)
716 else:
717 return os.path.join(self.path, name)
719 def get_packed_refs(self):
720 """Get contents of the packed-refs file.
722 Returns: Dictionary mapping ref names to SHA1s
724 Note: Will return an empty dictionary when no packed-refs file is
725 present.
726 """
727 # TODO: invalidate the cache on repacking
728 if self._packed_refs is None:
729 # set both to empty because we want _peeled_refs to be
730 # None if and only if _packed_refs is also None.
731 self._packed_refs = {}
732 self._peeled_refs = {}
733 path = os.path.join(self.path, b"packed-refs")
734 try:
735 f = GitFile(path, "rb")
736 except FileNotFoundError:
737 return {}
738 with f:
739 first_line = next(iter(f)).rstrip()
740 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
741 for sha, name, peeled in read_packed_refs_with_peeled(f):
742 self._packed_refs[name] = sha
743 if peeled:
744 self._peeled_refs[name] = peeled
745 else:
746 f.seek(0)
747 for sha, name in read_packed_refs(f):
748 self._packed_refs[name] = sha
749 return self._packed_refs
751 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
752 """Add the given refs as packed refs.
754 Args:
755 new_refs: A mapping of ref names to targets; if a target is None that
756 means remove the ref
757 """
758 if not new_refs:
759 return
761 path = os.path.join(self.path, b"packed-refs")
763 with GitFile(path, "wb") as f:
764 # reread cached refs from disk, while holding the lock
765 packed_refs = self.get_packed_refs().copy()
767 for ref, target in new_refs.items():
768 # sanity check
769 if ref == HEADREF:
770 raise ValueError("cannot pack HEAD")
772 # remove any loose refs pointing to this one -- please
773 # note that this bypasses remove_if_equals as we don't
774 # want to affect packed refs in here
775 with suppress(OSError):
776 os.remove(self.refpath(ref))
778 if target is not None:
779 packed_refs[ref] = target
780 else:
781 packed_refs.pop(ref, None)
783 write_packed_refs(f, packed_refs, self._peeled_refs)
785 self._packed_refs = packed_refs
787 def get_peeled(self, name):
788 """Return the cached peeled value of a ref, if available.
790 Args:
791 name: Name of the ref to peel
792 Returns: The peeled value of the ref. If the ref is known not point to
793 a tag, this will be the SHA the ref refers to. If the ref may point
794 to a tag, but no cached information is available, None is returned.
795 """
796 self.get_packed_refs()
797 if self._peeled_refs is None or name not in self._packed_refs:
798 # No cache: no peeled refs were read, or this ref is loose
799 return None
800 if name in self._peeled_refs:
801 return self._peeled_refs[name]
802 else:
803 # Known not peelable
804 return self[name]
806 def read_loose_ref(self, name):
807 """Read a reference file and return its contents.
809 If the reference file a symbolic reference, only read the first line of
810 the file. Otherwise, only read the first 40 bytes.
812 Args:
813 name: the refname to read, relative to refpath
814 Returns: The contents of the ref file, or None if the file does not
815 exist.
817 Raises:
818 IOError: if any other error occurs
819 """
820 filename = self.refpath(name)
821 try:
822 with GitFile(filename, "rb") as f:
823 header = f.read(len(SYMREF))
824 if header == SYMREF:
825 # Read only the first line
826 return header + next(iter(f)).rstrip(b"\r\n")
827 else:
828 # Read only the first 40 bytes
829 return header + f.read(40 - len(SYMREF))
830 except (OSError, UnicodeError):
831 # don't assume anything specific about the error; in
832 # particular, invalid or forbidden paths can raise weird
833 # errors depending on the specific operating system
834 return None
836 def _remove_packed_ref(self, name) -> None:
837 if self._packed_refs is None:
838 return
839 filename = os.path.join(self.path, b"packed-refs")
840 # reread cached refs from disk, while holding the lock
841 f = GitFile(filename, "wb")
842 try:
843 self._packed_refs = None
844 self.get_packed_refs()
846 if name not in self._packed_refs:
847 f.abort()
848 return
850 del self._packed_refs[name]
851 with suppress(KeyError):
852 del self._peeled_refs[name]
853 write_packed_refs(f, self._packed_refs, self._peeled_refs)
854 f.close()
855 except BaseException:
856 f.abort()
857 raise
859 def set_symbolic_ref(
860 self,
861 name,
862 other,
863 committer=None,
864 timestamp=None,
865 timezone=None,
866 message=None,
867 ) -> None:
868 """Make a ref point at another ref.
870 Args:
871 name: Name of the ref to set
872 other: Name of the ref to point at
873 message: Optional message to describe the change
874 """
875 self._check_refname(name)
876 self._check_refname(other)
877 filename = self.refpath(name)
878 f = GitFile(filename, "wb")
879 try:
880 f.write(SYMREF + other + b"\n")
881 sha = self.follow(name)[-1]
882 self._log(
883 name,
884 sha,
885 sha,
886 committer=committer,
887 timestamp=timestamp,
888 timezone=timezone,
889 message=message,
890 )
891 except BaseException:
892 f.abort()
893 raise
894 else:
895 f.close()
897 def set_if_equals(
898 self,
899 name,
900 old_ref,
901 new_ref,
902 committer=None,
903 timestamp=None,
904 timezone=None,
905 message=None,
906 ) -> bool:
907 """Set a refname to new_ref only if it currently equals old_ref.
909 This method follows all symbolic references, and can be used to perform
910 an atomic compare-and-swap operation.
912 Args:
913 name: The refname to set.
914 old_ref: The old sha the refname must refer to, or None to set
915 unconditionally.
916 new_ref: The new sha the refname will refer to.
917 message: Set message for reflog
918 Returns: True if the set was successful, False otherwise.
919 """
920 self._check_refname(name)
921 try:
922 realnames, _ = self.follow(name)
923 realname = realnames[-1]
924 except (KeyError, IndexError, SymrefLoop):
925 realname = name
926 filename = self.refpath(realname)
928 # make sure none of the ancestor folders is in packed refs
929 probe_ref = os.path.dirname(realname)
930 packed_refs = self.get_packed_refs()
931 while probe_ref:
932 if packed_refs.get(probe_ref, None) is not None:
933 raise NotADirectoryError(filename)
934 probe_ref = os.path.dirname(probe_ref)
936 ensure_dir_exists(os.path.dirname(filename))
937 with GitFile(filename, "wb") as f:
938 if old_ref is not None:
939 try:
940 # read again while holding the lock to handle race conditions
941 orig_ref = self.read_loose_ref(realname)
942 if orig_ref is None:
943 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
944 if orig_ref != old_ref:
945 f.abort()
946 return False
947 except OSError:
948 f.abort()
949 raise
951 # Check if ref already has the desired value while holding the lock
952 # This avoids fsync when ref is unchanged but still detects lock conflicts
953 current_ref = self.read_loose_ref(realname)
954 if current_ref is None:
955 current_ref = packed_refs.get(realname, None)
957 if current_ref is not None and current_ref == new_ref:
958 # Ref already has desired value, abort write to avoid fsync
959 f.abort()
960 return True
962 try:
963 f.write(new_ref + b"\n")
964 except OSError:
965 f.abort()
966 raise
967 self._log(
968 realname,
969 old_ref,
970 new_ref,
971 committer=committer,
972 timestamp=timestamp,
973 timezone=timezone,
974 message=message,
975 )
976 return True
978 def add_if_new(
979 self,
980 name: bytes,
981 ref: bytes,
982 committer=None,
983 timestamp=None,
984 timezone=None,
985 message: Optional[bytes] = None,
986 ) -> bool:
987 """Add a new reference only if it does not already exist.
989 This method follows symrefs, and only ensures that the last ref in the
990 chain does not exist.
992 Args:
993 name: The refname to set.
994 ref: The new sha the refname will refer to.
995 message: Optional message for reflog
996 Returns: True if the add was successful, False otherwise.
997 """
998 try:
999 realnames, contents = self.follow(name)
1000 if contents is not None:
1001 return False
1002 realname = realnames[-1]
1003 except (KeyError, IndexError):
1004 realname = name
1005 self._check_refname(realname)
1006 filename = self.refpath(realname)
1007 ensure_dir_exists(os.path.dirname(filename))
1008 with GitFile(filename, "wb") as f:
1009 if os.path.exists(filename) or name in self.get_packed_refs():
1010 f.abort()
1011 return False
1012 try:
1013 f.write(ref + b"\n")
1014 except OSError:
1015 f.abort()
1016 raise
1017 else:
1018 self._log(
1019 name,
1020 None,
1021 ref,
1022 committer=committer,
1023 timestamp=timestamp,
1024 timezone=timezone,
1025 message=message,
1026 )
1027 return True
1029 def remove_if_equals(
1030 self,
1031 name,
1032 old_ref,
1033 committer=None,
1034 timestamp=None,
1035 timezone=None,
1036 message=None,
1037 ) -> bool:
1038 """Remove a refname only if it currently equals old_ref.
1040 This method does not follow symbolic references. It can be used to
1041 perform an atomic compare-and-delete operation.
1043 Args:
1044 name: The refname to delete.
1045 old_ref: The old sha the refname must refer to, or None to
1046 delete unconditionally.
1047 message: Optional message
1048 Returns: True if the delete was successful, False otherwise.
1049 """
1050 self._check_refname(name)
1051 filename = self.refpath(name)
1052 ensure_dir_exists(os.path.dirname(filename))
1053 f = GitFile(filename, "wb")
1054 try:
1055 if old_ref is not None:
1056 orig_ref = self.read_loose_ref(name)
1057 if orig_ref is None:
1058 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1059 if orig_ref != old_ref:
1060 return False
1062 # remove the reference file itself
1063 try:
1064 found = os.path.lexists(filename)
1065 except OSError:
1066 # may only be packed, or otherwise unstorable
1067 found = False
1069 if found:
1070 os.remove(filename)
1072 self._remove_packed_ref(name)
1073 self._log(
1074 name,
1075 old_ref,
1076 None,
1077 committer=committer,
1078 timestamp=timestamp,
1079 timezone=timezone,
1080 message=message,
1081 )
1082 finally:
1083 # never write, we just wanted the lock
1084 f.abort()
1086 # outside of the lock, clean-up any parent directory that might now
1087 # be empty. this ensures that re-creating a reference of the same
1088 # name of what was previously a directory works as expected
1089 parent = name
1090 while True:
1091 try:
1092 parent, _ = parent.rsplit(b"/", 1)
1093 except ValueError:
1094 break
1096 if parent == b"refs":
1097 break
1098 parent_filename = self.refpath(parent)
1099 try:
1100 os.rmdir(parent_filename)
1101 except OSError:
1102 # this can be caused by the parent directory being
1103 # removed by another process, being not empty, etc.
1104 # in any case, this is non fatal because we already
1105 # removed the reference, just ignore it
1106 break
1108 return True
1110 def pack_refs(self, all: bool = False) -> None:
1111 """Pack loose refs into packed-refs file.
1113 Args:
1114 all: If True, pack all refs. If False, only pack tags.
1115 """
1116 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1117 for ref in self.allkeys():
1118 if ref == HEADREF:
1119 # Never pack HEAD
1120 continue
1121 if all or ref.startswith(LOCAL_TAG_PREFIX):
1122 try:
1123 sha = self[ref]
1124 if sha:
1125 refs_to_pack[ref] = sha
1126 except KeyError:
1127 # Broken ref, skip it
1128 pass
1130 if refs_to_pack:
1131 self.add_packed_refs(refs_to_pack)
1134def _split_ref_line(line):
1135 """Split a single ref line into a tuple of SHA1 and name."""
1136 fields = line.rstrip(b"\n\r").split(b" ")
1137 if len(fields) != 2:
1138 raise PackedRefsException(f"invalid ref line {line!r}")
1139 sha, name = fields
1140 if not valid_hexsha(sha):
1141 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1142 if not check_ref_format(name):
1143 raise PackedRefsException(f"invalid ref name {name!r}")
1144 return (sha, name)
1147def read_packed_refs(f):
1148 """Read a packed refs file.
1150 Args:
1151 f: file-like object to read from
1152 Returns: Iterator over tuples with SHA1s and ref names.
1153 """
1154 for line in f:
1155 if line.startswith(b"#"):
1156 # Comment
1157 continue
1158 if line.startswith(b"^"):
1159 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1160 yield _split_ref_line(line)
1163def read_packed_refs_with_peeled(f):
1164 """Read a packed refs file including peeled refs.
1166 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1167 with ref names, SHA1s, and peeled SHA1s (or None).
1169 Args:
1170 f: file-like object to read from, seek'ed to the second line
1171 """
1172 last = None
1173 for line in f:
1174 if line[0] == b"#":
1175 continue
1176 line = line.rstrip(b"\r\n")
1177 if line.startswith(b"^"):
1178 if not last:
1179 raise PackedRefsException("unexpected peeled ref line")
1180 if not valid_hexsha(line[1:]):
1181 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1182 sha, name = _split_ref_line(last)
1183 last = None
1184 yield (sha, name, line[1:])
1185 else:
1186 if last:
1187 sha, name = _split_ref_line(last)
1188 yield (sha, name, None)
1189 last = line
1190 if last:
1191 sha, name = _split_ref_line(last)
1192 yield (sha, name, None)
1195def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
1196 """Write a packed refs file.
1198 Args:
1199 f: empty file-like object to write to
1200 packed_refs: dict of refname to sha of packed refs to write
1201 peeled_refs: dict of refname to peeled value of sha
1202 """
1203 if peeled_refs is None:
1204 peeled_refs = {}
1205 else:
1206 f.write(b"# pack-refs with: peeled\n")
1207 for refname in sorted(packed_refs.keys()):
1208 f.write(git_line(packed_refs[refname], refname))
1209 if refname in peeled_refs:
1210 f.write(b"^" + peeled_refs[refname] + b"\n")
1213def read_info_refs(f):
1214 ret = {}
1215 for line in f.readlines():
1216 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1217 ret[name] = sha
1218 return ret
1221def write_info_refs(refs, store: ObjectContainer):
1222 """Generate info refs."""
1223 # TODO: Avoid recursive import :(
1224 from .object_store import peel_sha
1226 for name, sha in sorted(refs.items()):
1227 # get_refs() includes HEAD as a special case, but we don't want to
1228 # advertise it
1229 if name == HEADREF:
1230 continue
1231 try:
1232 o = store[sha]
1233 except KeyError:
1234 continue
1235 unpeeled, peeled = peel_sha(store, sha)
1236 yield o.id + b"\t" + name + b"\n"
1237 if o.id != peeled.id:
1238 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1241def is_local_branch(x):
1242 return x.startswith(LOCAL_BRANCH_PREFIX)
1245def strip_peeled_refs(refs):
1246 """Remove all peeled refs."""
1247 return {
1248 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1249 }
1252def split_peeled_refs(refs):
1253 """Split peeled refs from regular refs."""
1254 peeled = {}
1255 regular = {}
1256 for ref, sha in refs.items():
1257 if ref.endswith(PEELED_TAG_SUFFIX):
1258 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1259 else:
1260 regular[ref] = sha
1261 return regular, peeled
1264def _set_origin_head(refs, origin, origin_head) -> None:
1265 # set refs/remotes/origin/HEAD
1266 origin_base = b"refs/remotes/" + origin + b"/"
1267 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1268 origin_ref = origin_base + HEADREF
1269 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1270 if target_ref in refs:
1271 refs.set_symbolic_ref(origin_ref, target_ref)
1274def _set_default_branch(
1275 refs: RefsContainer,
1276 origin: bytes,
1277 origin_head: Optional[bytes],
1278 branch: bytes,
1279 ref_message: Optional[bytes],
1280) -> bytes:
1281 """Set the default branch."""
1282 origin_base = b"refs/remotes/" + origin + b"/"
1283 if branch:
1284 origin_ref = origin_base + branch
1285 if origin_ref in refs:
1286 local_ref = LOCAL_BRANCH_PREFIX + branch
1287 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1288 head_ref = local_ref
1289 elif LOCAL_TAG_PREFIX + branch in refs:
1290 head_ref = LOCAL_TAG_PREFIX + branch
1291 else:
1292 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1293 elif origin_head:
1294 head_ref = origin_head
1295 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1296 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1297 else:
1298 origin_ref = origin_head
1299 try:
1300 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1301 except KeyError:
1302 pass
1303 else:
1304 raise ValueError("neither origin_head nor branch are provided")
1305 return head_ref
1308def _set_head(refs, head_ref, ref_message):
1309 if head_ref.startswith(LOCAL_TAG_PREFIX):
1310 # detach HEAD at specified tag
1311 head = refs[head_ref]
1312 if isinstance(head, Tag):
1313 _cls, obj = head.object
1314 head = obj.get_object(obj).id
1315 del refs[HEADREF]
1316 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1317 else:
1318 # set HEAD to specific branch
1319 try:
1320 head = refs[head_ref]
1321 refs.set_symbolic_ref(HEADREF, head_ref)
1322 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1323 except KeyError:
1324 head = None
1325 return head
1328def _import_remote_refs(
1329 refs_container: RefsContainer,
1330 remote_name: str,
1331 refs: dict[str, str],
1332 message: Optional[bytes] = None,
1333 prune: bool = False,
1334 prune_tags: bool = False,
1335) -> None:
1336 stripped_refs = strip_peeled_refs(refs)
1337 branches = {
1338 n[len(LOCAL_BRANCH_PREFIX) :]: v
1339 for (n, v) in stripped_refs.items()
1340 if n.startswith(LOCAL_BRANCH_PREFIX)
1341 }
1342 refs_container.import_refs(
1343 b"refs/remotes/" + remote_name.encode(),
1344 branches,
1345 message=message,
1346 prune=prune,
1347 )
1348 tags = {
1349 n[len(LOCAL_TAG_PREFIX) :]: v
1350 for (n, v) in stripped_refs.items()
1351 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1352 }
1353 refs_container.import_refs(
1354 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1355 )
1358def serialize_refs(store, refs):
1359 # TODO: Avoid recursive import :(
1360 from .object_store import peel_sha
1362 ret = {}
1363 for ref, sha in refs.items():
1364 try:
1365 unpeeled, peeled = peel_sha(store, sha)
1366 except KeyError:
1367 warnings.warn(
1368 "ref {} points at non-present sha {}".format(
1369 ref.decode("utf-8", "replace"), sha.decode("ascii")
1370 ),
1371 UserWarning,
1372 )
1373 continue
1374 else:
1375 if isinstance(unpeeled, Tag):
1376 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1377 ret[ref] = unpeeled.id
1378 return ret
1381class locked_ref:
1382 """Lock a ref while making modifications.
1384 Works as a context manager.
1385 """
1387 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
1388 self._refs_container = refs_container
1389 self._refname = refname
1390 self._file: Optional[_GitFile] = None
1391 self._realname: Optional[Ref] = None
1392 self._deleted = False
1394 def __enter__(self) -> "locked_ref":
1395 self._refs_container._check_refname(self._refname)
1396 try:
1397 realnames, _ = self._refs_container.follow(self._refname)
1398 self._realname = realnames[-1]
1399 except (KeyError, IndexError, SymrefLoop):
1400 self._realname = self._refname
1402 filename = self._refs_container.refpath(self._realname)
1403 ensure_dir_exists(os.path.dirname(filename))
1404 self._file = GitFile(filename, "wb")
1405 return self
1407 def __exit__(
1408 self,
1409 exc_type: Optional[type],
1410 exc_value: Optional[BaseException],
1411 traceback: Optional[types.TracebackType],
1412 ) -> None:
1413 if self._file:
1414 if exc_type is not None or self._deleted:
1415 self._file.abort()
1416 else:
1417 self._file.close()
1419 def get(self) -> Optional[bytes]:
1420 """Get the current value of the ref."""
1421 if not self._file:
1422 raise RuntimeError("locked_ref not in context")
1424 current_ref = self._refs_container.read_loose_ref(self._realname)
1425 if current_ref is None:
1426 current_ref = self._refs_container.get_packed_refs().get(
1427 self._realname, None
1428 )
1429 return current_ref
1431 def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
1432 """Ensure the ref currently equals the expected value.
1434 Args:
1435 expected_value: The expected current value of the ref
1436 Returns:
1437 True if the ref equals the expected value, False otherwise
1438 """
1439 current_value = self.get()
1440 return current_value == expected_value
1442 def set(self, new_ref: bytes) -> None:
1443 """Set the ref to a new value.
1445 Args:
1446 new_ref: The new SHA1 or symbolic ref value
1447 """
1448 if not self._file:
1449 raise RuntimeError("locked_ref not in context")
1451 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
1452 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
1454 self._file.seek(0)
1455 self._file.truncate()
1456 self._file.write(new_ref + b"\n")
1457 self._deleted = False
1459 def set_symbolic_ref(self, target: Ref) -> None:
1460 """Make this ref point at another ref.
1462 Args:
1463 target: Name of the ref to point at
1464 """
1465 if not self._file:
1466 raise RuntimeError("locked_ref not in context")
1468 self._refs_container._check_refname(target)
1469 self._file.seek(0)
1470 self._file.truncate()
1471 self._file.write(SYMREF + target + b"\n")
1472 self._deleted = False
1474 def delete(self) -> None:
1475 """Delete the ref file while holding the lock."""
1476 if not self._file:
1477 raise RuntimeError("locked_ref not in context")
1479 # Delete the actual ref file while holding the lock
1480 if self._realname:
1481 filename = self._refs_container.refpath(self._realname)
1482 try:
1483 if os.path.lexists(filename):
1484 os.remove(filename)
1485 except FileNotFoundError:
1486 pass
1487 self._refs_container._remove_packed_ref(self._realname)
1489 self._deleted = True