Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import warnings
27from collections.abc import Iterator
28from contextlib import suppress
29from typing import Any, Optional, Union
31from .errors import PackedRefsException, RefFormatError
32from .file import GitFile, ensure_dir_exists
33from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
34from .pack import ObjectContainer
36Ref = bytes
38HEADREF = b"HEAD"
39SYMREF = b"ref: "
40LOCAL_BRANCH_PREFIX = b"refs/heads/"
41LOCAL_TAG_PREFIX = b"refs/tags/"
42LOCAL_REMOTE_PREFIX = b"refs/remotes/"
43LOCAL_NOTES_PREFIX = b"refs/notes/"
44BAD_REF_CHARS = set(b"\177 ~^:?*[")
45PEELED_TAG_SUFFIX = b"^{}"
47# For backwards compatibility
48ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
51class SymrefLoop(Exception):
52 """There is a loop between one or more symrefs."""
54 def __init__(self, ref, depth) -> None:
55 self.ref = ref
56 self.depth = depth
59def parse_symref_value(contents: bytes) -> bytes:
60 """Parse a symref value.
62 Args:
63 contents: Contents to parse
64 Returns: Destination
65 """
66 if contents.startswith(SYMREF):
67 return contents[len(SYMREF) :].rstrip(b"\r\n")
68 raise ValueError(contents)
71def check_ref_format(refname: Ref) -> bool:
72 """Check if a refname is correctly formatted.
74 Implements all the same rules as git-check-ref-format[1].
76 [1]
77 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
79 Args:
80 refname: The refname to check
81 Returns: True if refname is valid, False otherwise
82 """
83 # These could be combined into one big expression, but are listed
84 # separately to parallel [1].
85 if b"/." in refname or refname.startswith(b"."):
86 return False
87 if b"/" not in refname:
88 return False
89 if b".." in refname:
90 return False
91 for i, c in enumerate(refname):
92 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
93 return False
94 if refname[-1] in b"/.":
95 return False
96 if refname.endswith(b".lock"):
97 return False
98 if b"@{" in refname:
99 return False
100 if b"\\" in refname:
101 return False
102 return True
105class RefsContainer:
106 """A container for refs."""
108 def __init__(self, logger=None) -> None:
109 self._logger = logger
111 def _log(
112 self,
113 ref,
114 old_sha,
115 new_sha,
116 committer=None,
117 timestamp=None,
118 timezone=None,
119 message=None,
120 ) -> None:
121 if self._logger is None:
122 return
123 if message is None:
124 return
125 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
127 def set_symbolic_ref(
128 self,
129 name,
130 other,
131 committer=None,
132 timestamp=None,
133 timezone=None,
134 message=None,
135 ) -> None:
136 """Make a ref point at another ref.
138 Args:
139 name: Name of the ref to set
140 other: Name of the ref to point at
141 message: Optional message
142 """
143 raise NotImplementedError(self.set_symbolic_ref)
145 def get_packed_refs(self) -> dict[Ref, ObjectID]:
146 """Get contents of the packed-refs file.
148 Returns: Dictionary mapping ref names to SHA1s
150 Note: Will return an empty dictionary when no packed-refs file is
151 present.
152 """
153 raise NotImplementedError(self.get_packed_refs)
155 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
156 """Add the given refs as packed refs.
158 Args:
159 new_refs: A mapping of ref names to targets; if a target is None that
160 means remove the ref
161 """
162 raise NotImplementedError(self.add_packed_refs)
164 def get_peeled(self, name) -> Optional[ObjectID]:
165 """Return the cached peeled value of a ref, if available.
167 Args:
168 name: Name of the ref to peel
169 Returns: The peeled value of the ref. If the ref is known not point to
170 a tag, this will be the SHA the ref refers to. If the ref may point
171 to a tag, but no cached information is available, None is returned.
172 """
173 return None
175 def import_refs(
176 self,
177 base: Ref,
178 other: dict[Ref, ObjectID],
179 committer: Optional[bytes] = None,
180 timestamp: Optional[bytes] = None,
181 timezone: Optional[bytes] = None,
182 message: Optional[bytes] = None,
183 prune: bool = False,
184 ) -> None:
185 if prune:
186 to_delete = set(self.subkeys(base))
187 else:
188 to_delete = set()
189 for name, value in other.items():
190 if value is None:
191 to_delete.add(name)
192 else:
193 self.set_if_equals(
194 b"/".join((base, name)), None, value, message=message
195 )
196 if to_delete:
197 try:
198 to_delete.remove(name)
199 except KeyError:
200 pass
201 for ref in to_delete:
202 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
204 def allkeys(self) -> Iterator[Ref]:
205 """All refs present in this container."""
206 raise NotImplementedError(self.allkeys)
208 def __iter__(self):
209 return iter(self.allkeys())
211 def keys(self, base=None):
212 """Refs present in this container.
214 Args:
215 base: An optional base to return refs under.
216 Returns: An unsorted set of valid refs in this container, including
217 packed refs.
218 """
219 if base is not None:
220 return self.subkeys(base)
221 else:
222 return self.allkeys()
224 def subkeys(self, base):
225 """Refs present in this container under a base.
227 Args:
228 base: The base to return refs under.
229 Returns: A set of valid refs in this container under the base; the base
230 prefix is stripped from the ref names returned.
231 """
232 keys = set()
233 base_len = len(base) + 1
234 for refname in self.allkeys():
235 if refname.startswith(base):
236 keys.add(refname[base_len:])
237 return keys
239 def as_dict(self, base=None) -> dict[Ref, ObjectID]:
240 """Return the contents of this container as a dictionary."""
241 ret = {}
242 keys = self.keys(base)
243 if base is None:
244 base = b""
245 else:
246 base = base.rstrip(b"/")
247 for key in keys:
248 try:
249 ret[key] = self[(base + b"/" + key).strip(b"/")]
250 except (SymrefLoop, KeyError):
251 continue # Unable to resolve
253 return ret
255 def _check_refname(self, name) -> None:
256 """Ensure a refname is valid and lives in refs or is HEAD.
258 HEAD is not a valid refname according to git-check-ref-format, but this
259 class needs to be able to touch HEAD. Also, check_ref_format expects
260 refnames without the leading 'refs/', but this class requires that
261 so it cannot touch anything outside the refs dir (or HEAD).
263 Args:
264 name: The name of the reference.
266 Raises:
267 KeyError: if a refname is not HEAD or is otherwise not valid.
268 """
269 if name in (HEADREF, b"refs/stash"):
270 return
271 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
272 raise RefFormatError(name)
274 def read_ref(self, refname):
275 """Read a reference without following any references.
277 Args:
278 refname: The name of the reference
279 Returns: The contents of the ref file, or None if it does
280 not exist.
281 """
282 contents = self.read_loose_ref(refname)
283 if not contents:
284 contents = self.get_packed_refs().get(refname, None)
285 return contents
287 def read_loose_ref(self, name) -> bytes:
288 """Read a loose reference and return its contents.
290 Args:
291 name: the refname to read
292 Returns: The contents of the ref file, or None if it does
293 not exist.
294 """
295 raise NotImplementedError(self.read_loose_ref)
297 def follow(self, name) -> tuple[list[bytes], bytes]:
298 """Follow a reference name.
300 Returns: a tuple of (refnames, sha), wheres refnames are the names of
301 references in the chain
302 """
303 contents = SYMREF + name
304 depth = 0
305 refnames = []
306 while contents.startswith(SYMREF):
307 refname = contents[len(SYMREF) :]
308 refnames.append(refname)
309 contents = self.read_ref(refname)
310 if not contents:
311 break
312 depth += 1
313 if depth > 5:
314 raise SymrefLoop(name, depth)
315 return refnames, contents
317 def __contains__(self, refname) -> bool:
318 if self.read_ref(refname):
319 return True
320 return False
322 def __getitem__(self, name) -> ObjectID:
323 """Get the SHA1 for a reference name.
325 This method follows all symbolic references.
326 """
327 _, sha = self.follow(name)
328 if sha is None:
329 raise KeyError(name)
330 return sha
332 def set_if_equals(
333 self,
334 name,
335 old_ref,
336 new_ref,
337 committer=None,
338 timestamp=None,
339 timezone=None,
340 message=None,
341 ) -> bool:
342 """Set a refname to new_ref only if it currently equals old_ref.
344 This method follows all symbolic references if applicable for the
345 subclass, and can be used to perform an atomic compare-and-swap
346 operation.
348 Args:
349 name: The refname to set.
350 old_ref: The old sha the refname must refer to, or None to set
351 unconditionally.
352 new_ref: The new sha the refname will refer to.
353 message: Message for reflog
354 Returns: True if the set was successful, False otherwise.
355 """
356 raise NotImplementedError(self.set_if_equals)
358 def add_if_new(
359 self, name, ref, committer=None, timestamp=None, timezone=None, message=None
360 ) -> bool:
361 """Add a new reference only if it does not already exist.
363 Args:
364 name: Ref name
365 ref: Ref value
366 """
367 raise NotImplementedError(self.add_if_new)
369 def __setitem__(self, name, ref) -> None:
370 """Set a reference name to point to the given SHA1.
372 This method follows all symbolic references if applicable for the
373 subclass.
375 Note: This method unconditionally overwrites the contents of a
376 reference. To update atomically only if the reference has not
377 changed, use set_if_equals().
379 Args:
380 name: The refname to set.
381 ref: The new sha the refname will refer to.
382 """
383 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
384 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
385 self.set_if_equals(name, None, ref)
387 def remove_if_equals(
388 self,
389 name,
390 old_ref,
391 committer=None,
392 timestamp=None,
393 timezone=None,
394 message=None,
395 ) -> bool:
396 """Remove a refname only if it currently equals old_ref.
398 This method does not follow symbolic references, even if applicable for
399 the subclass. It can be used to perform an atomic compare-and-delete
400 operation.
402 Args:
403 name: The refname to delete.
404 old_ref: The old sha the refname must refer to, or None to
405 delete unconditionally.
406 message: Message for reflog
407 Returns: True if the delete was successful, False otherwise.
408 """
409 raise NotImplementedError(self.remove_if_equals)
411 def __delitem__(self, name) -> None:
412 """Remove a refname.
414 This method does not follow symbolic references, even if applicable for
415 the subclass.
417 Note: This method unconditionally deletes the contents of a reference.
418 To delete atomically only if the reference has not changed, use
419 remove_if_equals().
421 Args:
422 name: The refname to delete.
423 """
424 self.remove_if_equals(name, None)
426 def get_symrefs(self):
427 """Get a dict with all symrefs in this container.
429 Returns: Dictionary mapping source ref to target ref
430 """
431 ret = {}
432 for src in self.allkeys():
433 try:
434 dst = parse_symref_value(self.read_ref(src))
435 except ValueError:
436 pass
437 else:
438 ret[src] = dst
439 return ret
441 def pack_refs(self, all: bool = False) -> None:
442 """Pack loose refs into packed-refs file.
444 Args:
445 all: If True, pack all refs. If False, only pack tags.
446 """
447 raise NotImplementedError(self.pack_refs)
450class DictRefsContainer(RefsContainer):
451 """RefsContainer backed by a simple dict.
453 This container does not support symbolic or packed references and is not
454 threadsafe.
455 """
457 def __init__(self, refs, logger=None) -> None:
458 super().__init__(logger=logger)
459 self._refs = refs
460 self._peeled: dict[bytes, ObjectID] = {}
461 self._watchers: set[Any] = set()
463 def allkeys(self):
464 return self._refs.keys()
466 def read_loose_ref(self, name):
467 return self._refs.get(name, None)
469 def get_packed_refs(self):
470 return {}
472 def _notify(self, ref, newsha) -> None:
473 for watcher in self._watchers:
474 watcher._notify((ref, newsha))
476 def set_symbolic_ref(
477 self,
478 name: Ref,
479 other: Ref,
480 committer=None,
481 timestamp=None,
482 timezone=None,
483 message=None,
484 ) -> None:
485 old = self.follow(name)[-1]
486 new = SYMREF + other
487 self._refs[name] = new
488 self._notify(name, new)
489 self._log(
490 name,
491 old,
492 new,
493 committer=committer,
494 timestamp=timestamp,
495 timezone=timezone,
496 message=message,
497 )
499 def set_if_equals(
500 self,
501 name,
502 old_ref,
503 new_ref,
504 committer=None,
505 timestamp=None,
506 timezone=None,
507 message=None,
508 ) -> bool:
509 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
510 return False
511 # Only update the specific ref requested, not the whole chain
512 self._check_refname(name)
513 old = self._refs.get(name)
514 self._refs[name] = new_ref
515 self._notify(name, new_ref)
516 self._log(
517 name,
518 old,
519 new_ref,
520 committer=committer,
521 timestamp=timestamp,
522 timezone=timezone,
523 message=message,
524 )
525 return True
527 def add_if_new(
528 self,
529 name: Ref,
530 ref: ObjectID,
531 committer=None,
532 timestamp=None,
533 timezone=None,
534 message: Optional[bytes] = None,
535 ) -> bool:
536 if name in self._refs:
537 return False
538 self._refs[name] = ref
539 self._notify(name, ref)
540 self._log(
541 name,
542 None,
543 ref,
544 committer=committer,
545 timestamp=timestamp,
546 timezone=timezone,
547 message=message,
548 )
549 return True
551 def remove_if_equals(
552 self,
553 name,
554 old_ref,
555 committer=None,
556 timestamp=None,
557 timezone=None,
558 message=None,
559 ) -> bool:
560 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
561 return False
562 try:
563 old = self._refs.pop(name)
564 except KeyError:
565 pass
566 else:
567 self._notify(name, None)
568 self._log(
569 name,
570 old,
571 None,
572 committer=committer,
573 timestamp=timestamp,
574 timezone=timezone,
575 message=message,
576 )
577 return True
579 def get_peeled(self, name):
580 return self._peeled.get(name)
582 def _update(self, refs) -> None:
583 """Update multiple refs; intended only for testing."""
584 # TODO(dborowitz): replace this with a public function that uses
585 # set_if_equal.
586 for ref, sha in refs.items():
587 self.set_if_equals(ref, None, sha)
589 def _update_peeled(self, peeled) -> None:
590 """Update cached peeled refs; intended only for testing."""
591 self._peeled.update(peeled)
594class InfoRefsContainer(RefsContainer):
595 """Refs container that reads refs from a info/refs file."""
597 def __init__(self, f) -> None:
598 self._refs = {}
599 self._peeled = {}
600 refs = read_info_refs(f)
601 (self._refs, self._peeled) = split_peeled_refs(refs)
603 def allkeys(self):
604 return self._refs.keys()
606 def read_loose_ref(self, name):
607 return self._refs.get(name, None)
609 def get_packed_refs(self):
610 return {}
612 def get_peeled(self, name):
613 try:
614 return self._peeled[name]
615 except KeyError:
616 return self._refs[name]
619class DiskRefsContainer(RefsContainer):
620 """Refs container that reads refs from disk."""
622 def __init__(
623 self,
624 path: Union[str, bytes, os.PathLike],
625 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
626 logger=None,
627 ) -> None:
628 super().__init__(logger=logger)
629 # Convert path-like objects to strings, then to bytes for Git compatibility
630 self.path = os.fsencode(os.fspath(path))
631 if worktree_path is None:
632 self.worktree_path = self.path
633 else:
634 self.worktree_path = os.fsencode(os.fspath(worktree_path))
635 self._packed_refs = None
636 self._peeled_refs = None
638 def __repr__(self) -> str:
639 return f"{self.__class__.__name__}({self.path!r})"
641 def subkeys(self, base):
642 subkeys = set()
643 path = self.refpath(base)
644 for root, unused_dirs, files in os.walk(path):
645 dir = root[len(path) :]
646 if os.path.sep != "/":
647 dir = dir.replace(os.fsencode(os.path.sep), b"/")
648 dir = dir.strip(b"/")
649 for filename in files:
650 refname = b"/".join(([dir] if dir else []) + [filename])
651 # check_ref_format requires at least one /, so we prepend the
652 # base before calling it.
653 if check_ref_format(base + b"/" + refname):
654 subkeys.add(refname)
655 for key in self.get_packed_refs():
656 if key.startswith(base):
657 subkeys.add(key[len(base) :].strip(b"/"))
658 return subkeys
660 def allkeys(self):
661 allkeys = set()
662 if os.path.exists(self.refpath(HEADREF)):
663 allkeys.add(HEADREF)
664 path = self.refpath(b"")
665 refspath = self.refpath(b"refs")
666 for root, unused_dirs, files in os.walk(refspath):
667 dir = root[len(path) :]
668 if os.path.sep != "/":
669 dir = dir.replace(os.fsencode(os.path.sep), b"/")
670 for filename in files:
671 refname = b"/".join([dir, filename])
672 if check_ref_format(refname):
673 allkeys.add(refname)
674 allkeys.update(self.get_packed_refs())
675 return allkeys
677 def refpath(self, name):
678 """Return the disk path of a ref."""
679 if os.path.sep != "/":
680 name = name.replace(b"/", os.fsencode(os.path.sep))
681 # TODO: as the 'HEAD' reference is working tree specific, it
682 # should actually not be a part of RefsContainer
683 if name == HEADREF:
684 return os.path.join(self.worktree_path, name)
685 else:
686 return os.path.join(self.path, name)
688 def get_packed_refs(self):
689 """Get contents of the packed-refs file.
691 Returns: Dictionary mapping ref names to SHA1s
693 Note: Will return an empty dictionary when no packed-refs file is
694 present.
695 """
696 # TODO: invalidate the cache on repacking
697 if self._packed_refs is None:
698 # set both to empty because we want _peeled_refs to be
699 # None if and only if _packed_refs is also None.
700 self._packed_refs = {}
701 self._peeled_refs = {}
702 path = os.path.join(self.path, b"packed-refs")
703 try:
704 f = GitFile(path, "rb")
705 except FileNotFoundError:
706 return {}
707 with f:
708 first_line = next(iter(f)).rstrip()
709 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
710 for sha, name, peeled in read_packed_refs_with_peeled(f):
711 self._packed_refs[name] = sha
712 if peeled:
713 self._peeled_refs[name] = peeled
714 else:
715 f.seek(0)
716 for sha, name in read_packed_refs(f):
717 self._packed_refs[name] = sha
718 return self._packed_refs
720 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
721 """Add the given refs as packed refs.
723 Args:
724 new_refs: A mapping of ref names to targets; if a target is None that
725 means remove the ref
726 """
727 if not new_refs:
728 return
730 path = os.path.join(self.path, b"packed-refs")
732 with GitFile(path, "wb") as f:
733 # reread cached refs from disk, while holding the lock
734 packed_refs = self.get_packed_refs().copy()
736 for ref, target in new_refs.items():
737 # sanity check
738 if ref == HEADREF:
739 raise ValueError("cannot pack HEAD")
741 # remove any loose refs pointing to this one -- please
742 # note that this bypasses remove_if_equals as we don't
743 # want to affect packed refs in here
744 with suppress(OSError):
745 os.remove(self.refpath(ref))
747 if target is not None:
748 packed_refs[ref] = target
749 else:
750 packed_refs.pop(ref, None)
752 write_packed_refs(f, packed_refs, self._peeled_refs)
754 self._packed_refs = packed_refs
756 def get_peeled(self, name):
757 """Return the cached peeled value of a ref, if available.
759 Args:
760 name: Name of the ref to peel
761 Returns: The peeled value of the ref. If the ref is known not point to
762 a tag, this will be the SHA the ref refers to. If the ref may point
763 to a tag, but no cached information is available, None is returned.
764 """
765 self.get_packed_refs()
766 if self._peeled_refs is None or name not in self._packed_refs:
767 # No cache: no peeled refs were read, or this ref is loose
768 return None
769 if name in self._peeled_refs:
770 return self._peeled_refs[name]
771 else:
772 # Known not peelable
773 return self[name]
775 def read_loose_ref(self, name):
776 """Read a reference file and return its contents.
778 If the reference file a symbolic reference, only read the first line of
779 the file. Otherwise, only read the first 40 bytes.
781 Args:
782 name: the refname to read, relative to refpath
783 Returns: The contents of the ref file, or None if the file does not
784 exist.
786 Raises:
787 IOError: if any other error occurs
788 """
789 filename = self.refpath(name)
790 try:
791 with GitFile(filename, "rb") as f:
792 header = f.read(len(SYMREF))
793 if header == SYMREF:
794 # Read only the first line
795 return header + next(iter(f)).rstrip(b"\r\n")
796 else:
797 # Read only the first 40 bytes
798 return header + f.read(40 - len(SYMREF))
799 except (OSError, UnicodeError):
800 # don't assume anything specific about the error; in
801 # particular, invalid or forbidden paths can raise weird
802 # errors depending on the specific operating system
803 return None
805 def _remove_packed_ref(self, name) -> None:
806 if self._packed_refs is None:
807 return
808 filename = os.path.join(self.path, b"packed-refs")
809 # reread cached refs from disk, while holding the lock
810 f = GitFile(filename, "wb")
811 try:
812 self._packed_refs = None
813 self.get_packed_refs()
815 if name not in self._packed_refs:
816 return
818 del self._packed_refs[name]
819 with suppress(KeyError):
820 del self._peeled_refs[name]
821 write_packed_refs(f, self._packed_refs, self._peeled_refs)
822 f.close()
823 finally:
824 f.abort()
826 def set_symbolic_ref(
827 self,
828 name,
829 other,
830 committer=None,
831 timestamp=None,
832 timezone=None,
833 message=None,
834 ) -> None:
835 """Make a ref point at another ref.
837 Args:
838 name: Name of the ref to set
839 other: Name of the ref to point at
840 message: Optional message to describe the change
841 """
842 self._check_refname(name)
843 self._check_refname(other)
844 filename = self.refpath(name)
845 f = GitFile(filename, "wb")
846 try:
847 f.write(SYMREF + other + b"\n")
848 sha = self.follow(name)[-1]
849 self._log(
850 name,
851 sha,
852 sha,
853 committer=committer,
854 timestamp=timestamp,
855 timezone=timezone,
856 message=message,
857 )
858 except BaseException:
859 f.abort()
860 raise
861 else:
862 f.close()
864 def set_if_equals(
865 self,
866 name,
867 old_ref,
868 new_ref,
869 committer=None,
870 timestamp=None,
871 timezone=None,
872 message=None,
873 ) -> bool:
874 """Set a refname to new_ref only if it currently equals old_ref.
876 This method follows all symbolic references, and can be used to perform
877 an atomic compare-and-swap operation.
879 Args:
880 name: The refname to set.
881 old_ref: The old sha the refname must refer to, or None to set
882 unconditionally.
883 new_ref: The new sha the refname will refer to.
884 message: Set message for reflog
885 Returns: True if the set was successful, False otherwise.
886 """
887 self._check_refname(name)
888 try:
889 realnames, _ = self.follow(name)
890 realname = realnames[-1]
891 except (KeyError, IndexError, SymrefLoop):
892 realname = name
893 filename = self.refpath(realname)
895 # make sure none of the ancestor folders is in packed refs
896 probe_ref = os.path.dirname(realname)
897 packed_refs = self.get_packed_refs()
898 while probe_ref:
899 if packed_refs.get(probe_ref, None) is not None:
900 raise NotADirectoryError(filename)
901 probe_ref = os.path.dirname(probe_ref)
903 ensure_dir_exists(os.path.dirname(filename))
904 with GitFile(filename, "wb") as f:
905 if old_ref is not None:
906 try:
907 # read again while holding the lock
908 orig_ref = self.read_loose_ref(realname)
909 if orig_ref is None:
910 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
911 if orig_ref != old_ref:
912 f.abort()
913 return False
914 except OSError:
915 f.abort()
916 raise
917 try:
918 f.write(new_ref + b"\n")
919 except OSError:
920 f.abort()
921 raise
922 self._log(
923 realname,
924 old_ref,
925 new_ref,
926 committer=committer,
927 timestamp=timestamp,
928 timezone=timezone,
929 message=message,
930 )
931 return True
933 def add_if_new(
934 self,
935 name: bytes,
936 ref: bytes,
937 committer=None,
938 timestamp=None,
939 timezone=None,
940 message: Optional[bytes] = None,
941 ) -> bool:
942 """Add a new reference only if it does not already exist.
944 This method follows symrefs, and only ensures that the last ref in the
945 chain does not exist.
947 Args:
948 name: The refname to set.
949 ref: The new sha the refname will refer to.
950 message: Optional message for reflog
951 Returns: True if the add was successful, False otherwise.
952 """
953 try:
954 realnames, contents = self.follow(name)
955 if contents is not None:
956 return False
957 realname = realnames[-1]
958 except (KeyError, IndexError):
959 realname = name
960 self._check_refname(realname)
961 filename = self.refpath(realname)
962 ensure_dir_exists(os.path.dirname(filename))
963 with GitFile(filename, "wb") as f:
964 if os.path.exists(filename) or name in self.get_packed_refs():
965 f.abort()
966 return False
967 try:
968 f.write(ref + b"\n")
969 except OSError:
970 f.abort()
971 raise
972 else:
973 self._log(
974 name,
975 None,
976 ref,
977 committer=committer,
978 timestamp=timestamp,
979 timezone=timezone,
980 message=message,
981 )
982 return True
984 def remove_if_equals(
985 self,
986 name,
987 old_ref,
988 committer=None,
989 timestamp=None,
990 timezone=None,
991 message=None,
992 ) -> bool:
993 """Remove a refname only if it currently equals old_ref.
995 This method does not follow symbolic references. It can be used to
996 perform an atomic compare-and-delete operation.
998 Args:
999 name: The refname to delete.
1000 old_ref: The old sha the refname must refer to, or None to
1001 delete unconditionally.
1002 message: Optional message
1003 Returns: True if the delete was successful, False otherwise.
1004 """
1005 self._check_refname(name)
1006 filename = self.refpath(name)
1007 ensure_dir_exists(os.path.dirname(filename))
1008 f = GitFile(filename, "wb")
1009 try:
1010 if old_ref is not None:
1011 orig_ref = self.read_loose_ref(name)
1012 if orig_ref is None:
1013 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1014 if orig_ref != old_ref:
1015 return False
1017 # remove the reference file itself
1018 try:
1019 found = os.path.lexists(filename)
1020 except OSError:
1021 # may only be packed, or otherwise unstorable
1022 found = False
1024 if found:
1025 os.remove(filename)
1027 self._remove_packed_ref(name)
1028 self._log(
1029 name,
1030 old_ref,
1031 None,
1032 committer=committer,
1033 timestamp=timestamp,
1034 timezone=timezone,
1035 message=message,
1036 )
1037 finally:
1038 # never write, we just wanted the lock
1039 f.abort()
1041 # outside of the lock, clean-up any parent directory that might now
1042 # be empty. this ensures that re-creating a reference of the same
1043 # name of what was previously a directory works as expected
1044 parent = name
1045 while True:
1046 try:
1047 parent, _ = parent.rsplit(b"/", 1)
1048 except ValueError:
1049 break
1051 if parent == b"refs":
1052 break
1053 parent_filename = self.refpath(parent)
1054 try:
1055 os.rmdir(parent_filename)
1056 except OSError:
1057 # this can be caused by the parent directory being
1058 # removed by another process, being not empty, etc.
1059 # in any case, this is non fatal because we already
1060 # removed the reference, just ignore it
1061 break
1063 return True
1065 def pack_refs(self, all: bool = False) -> None:
1066 """Pack loose refs into packed-refs file.
1068 Args:
1069 all: If True, pack all refs. If False, only pack tags.
1070 """
1071 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1072 for ref in self.allkeys():
1073 if ref == HEADREF:
1074 # Never pack HEAD
1075 continue
1076 if all or ref.startswith(LOCAL_TAG_PREFIX):
1077 try:
1078 sha = self[ref]
1079 if sha:
1080 refs_to_pack[ref] = sha
1081 except KeyError:
1082 # Broken ref, skip it
1083 pass
1085 if refs_to_pack:
1086 self.add_packed_refs(refs_to_pack)
1089def _split_ref_line(line):
1090 """Split a single ref line into a tuple of SHA1 and name."""
1091 fields = line.rstrip(b"\n\r").split(b" ")
1092 if len(fields) != 2:
1093 raise PackedRefsException(f"invalid ref line {line!r}")
1094 sha, name = fields
1095 if not valid_hexsha(sha):
1096 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1097 if not check_ref_format(name):
1098 raise PackedRefsException(f"invalid ref name {name!r}")
1099 return (sha, name)
1102def read_packed_refs(f):
1103 """Read a packed refs file.
1105 Args:
1106 f: file-like object to read from
1107 Returns: Iterator over tuples with SHA1s and ref names.
1108 """
1109 for line in f:
1110 if line.startswith(b"#"):
1111 # Comment
1112 continue
1113 if line.startswith(b"^"):
1114 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1115 yield _split_ref_line(line)
1118def read_packed_refs_with_peeled(f):
1119 """Read a packed refs file including peeled refs.
1121 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1122 with ref names, SHA1s, and peeled SHA1s (or None).
1124 Args:
1125 f: file-like object to read from, seek'ed to the second line
1126 """
1127 last = None
1128 for line in f:
1129 if line[0] == b"#":
1130 continue
1131 line = line.rstrip(b"\r\n")
1132 if line.startswith(b"^"):
1133 if not last:
1134 raise PackedRefsException("unexpected peeled ref line")
1135 if not valid_hexsha(line[1:]):
1136 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1137 sha, name = _split_ref_line(last)
1138 last = None
1139 yield (sha, name, line[1:])
1140 else:
1141 if last:
1142 sha, name = _split_ref_line(last)
1143 yield (sha, name, None)
1144 last = line
1145 if last:
1146 sha, name = _split_ref_line(last)
1147 yield (sha, name, None)
1150def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
1151 """Write a packed refs file.
1153 Args:
1154 f: empty file-like object to write to
1155 packed_refs: dict of refname to sha of packed refs to write
1156 peeled_refs: dict of refname to peeled value of sha
1157 """
1158 if peeled_refs is None:
1159 peeled_refs = {}
1160 else:
1161 f.write(b"# pack-refs with: peeled\n")
1162 for refname in sorted(packed_refs.keys()):
1163 f.write(git_line(packed_refs[refname], refname))
1164 if refname in peeled_refs:
1165 f.write(b"^" + peeled_refs[refname] + b"\n")
1168def read_info_refs(f):
1169 ret = {}
1170 for line in f.readlines():
1171 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1172 ret[name] = sha
1173 return ret
1176def write_info_refs(refs, store: ObjectContainer):
1177 """Generate info refs."""
1178 # TODO: Avoid recursive import :(
1179 from .object_store import peel_sha
1181 for name, sha in sorted(refs.items()):
1182 # get_refs() includes HEAD as a special case, but we don't want to
1183 # advertise it
1184 if name == HEADREF:
1185 continue
1186 try:
1187 o = store[sha]
1188 except KeyError:
1189 continue
1190 unpeeled, peeled = peel_sha(store, sha)
1191 yield o.id + b"\t" + name + b"\n"
1192 if o.id != peeled.id:
1193 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1196def is_local_branch(x):
1197 return x.startswith(LOCAL_BRANCH_PREFIX)
1200def strip_peeled_refs(refs):
1201 """Remove all peeled refs."""
1202 return {
1203 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1204 }
1207def split_peeled_refs(refs):
1208 """Split peeled refs from regular refs."""
1209 peeled = {}
1210 regular = {}
1211 for ref, sha in refs.items():
1212 if ref.endswith(PEELED_TAG_SUFFIX):
1213 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1214 else:
1215 regular[ref] = sha
1216 return regular, peeled
1219def _set_origin_head(refs, origin, origin_head) -> None:
1220 # set refs/remotes/origin/HEAD
1221 origin_base = b"refs/remotes/" + origin + b"/"
1222 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1223 origin_ref = origin_base + HEADREF
1224 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1225 if target_ref in refs:
1226 refs.set_symbolic_ref(origin_ref, target_ref)
1229def _set_default_branch(
1230 refs: RefsContainer,
1231 origin: bytes,
1232 origin_head: Optional[bytes],
1233 branch: bytes,
1234 ref_message: Optional[bytes],
1235) -> bytes:
1236 """Set the default branch."""
1237 origin_base = b"refs/remotes/" + origin + b"/"
1238 if branch:
1239 origin_ref = origin_base + branch
1240 if origin_ref in refs:
1241 local_ref = LOCAL_BRANCH_PREFIX + branch
1242 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1243 head_ref = local_ref
1244 elif LOCAL_TAG_PREFIX + branch in refs:
1245 head_ref = LOCAL_TAG_PREFIX + branch
1246 else:
1247 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1248 elif origin_head:
1249 head_ref = origin_head
1250 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1251 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1252 else:
1253 origin_ref = origin_head
1254 try:
1255 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1256 except KeyError:
1257 pass
1258 else:
1259 raise ValueError("neither origin_head nor branch are provided")
1260 return head_ref
1263def _set_head(refs, head_ref, ref_message):
1264 if head_ref.startswith(LOCAL_TAG_PREFIX):
1265 # detach HEAD at specified tag
1266 head = refs[head_ref]
1267 if isinstance(head, Tag):
1268 _cls, obj = head.object
1269 head = obj.get_object(obj).id
1270 del refs[HEADREF]
1271 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1272 else:
1273 # set HEAD to specific branch
1274 try:
1275 head = refs[head_ref]
1276 refs.set_symbolic_ref(HEADREF, head_ref)
1277 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1278 except KeyError:
1279 head = None
1280 return head
1283def _import_remote_refs(
1284 refs_container: RefsContainer,
1285 remote_name: str,
1286 refs: dict[str, str],
1287 message: Optional[bytes] = None,
1288 prune: bool = False,
1289 prune_tags: bool = False,
1290) -> None:
1291 stripped_refs = strip_peeled_refs(refs)
1292 branches = {
1293 n[len(LOCAL_BRANCH_PREFIX) :]: v
1294 for (n, v) in stripped_refs.items()
1295 if n.startswith(LOCAL_BRANCH_PREFIX)
1296 }
1297 refs_container.import_refs(
1298 b"refs/remotes/" + remote_name.encode(),
1299 branches,
1300 message=message,
1301 prune=prune,
1302 )
1303 tags = {
1304 n[len(LOCAL_TAG_PREFIX) :]: v
1305 for (n, v) in stripped_refs.items()
1306 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1307 }
1308 refs_container.import_refs(
1309 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1310 )
1313def serialize_refs(store, refs):
1314 # TODO: Avoid recursive import :(
1315 from .object_store import peel_sha
1317 ret = {}
1318 for ref, sha in refs.items():
1319 try:
1320 unpeeled, peeled = peel_sha(store, sha)
1321 except KeyError:
1322 warnings.warn(
1323 "ref {} points at non-present sha {}".format(
1324 ref.decode("utf-8", "replace"), sha.decode("ascii")
1325 ),
1326 UserWarning,
1327 )
1328 continue
1329 else:
1330 if isinstance(unpeeled, Tag):
1331 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1332 ret[ref] = unpeeled.id
1333 return ret