Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
23"""Ref handling."""
25import os
26import warnings
27from collections.abc import Iterator
28from contextlib import suppress
29from typing import Any, Optional, Union
31from .errors import PackedRefsException, RefFormatError
32from .file import GitFile, ensure_dir_exists
33from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
34from .pack import ObjectContainer
36Ref = bytes
38HEADREF = b"HEAD"
39SYMREF = b"ref: "
40LOCAL_BRANCH_PREFIX = b"refs/heads/"
41LOCAL_TAG_PREFIX = b"refs/tags/"
42LOCAL_REMOTE_PREFIX = b"refs/remotes/"
43LOCAL_NOTES_PREFIX = b"refs/notes/"
44BAD_REF_CHARS = set(b"\177 ~^:?*[")
45PEELED_TAG_SUFFIX = b"^{}"
47# For backwards compatibility
48ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
51class SymrefLoop(Exception):
52 """There is a loop between one or more symrefs."""
54 def __init__(self, ref, depth) -> None:
55 self.ref = ref
56 self.depth = depth
59def parse_symref_value(contents: bytes) -> bytes:
60 """Parse a symref value.
62 Args:
63 contents: Contents to parse
64 Returns: Destination
65 """
66 if contents.startswith(SYMREF):
67 return contents[len(SYMREF) :].rstrip(b"\r\n")
68 raise ValueError(contents)
71def check_ref_format(refname: Ref) -> bool:
72 """Check if a refname is correctly formatted.
74 Implements all the same rules as git-check-ref-format[1].
76 [1]
77 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
79 Args:
80 refname: The refname to check
81 Returns: True if refname is valid, False otherwise
82 """
83 # These could be combined into one big expression, but are listed
84 # separately to parallel [1].
85 if b"/." in refname or refname.startswith(b"."):
86 return False
87 if b"/" not in refname:
88 return False
89 if b".." in refname:
90 return False
91 for i, c in enumerate(refname):
92 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
93 return False
94 if refname[-1] in b"/.":
95 return False
96 if refname.endswith(b".lock"):
97 return False
98 if b"@{" in refname:
99 return False
100 if b"\\" in refname:
101 return False
102 return True
105def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
106 """Parse a remote ref into remote name and branch name.
108 Args:
109 ref: Remote ref like b"refs/remotes/origin/main"
111 Returns:
112 Tuple of (remote_name, branch_name)
114 Raises:
115 ValueError: If ref is not a valid remote ref
116 """
117 if not ref.startswith(LOCAL_REMOTE_PREFIX):
118 raise ValueError(f"Not a remote ref: {ref!r}")
120 # Remove the prefix
121 remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
123 # Split into remote name and branch name
124 parts = remainder.split(b"/", 1)
125 if len(parts) != 2:
126 raise ValueError(f"Invalid remote ref format: {ref!r}")
128 remote_name, branch_name = parts
129 return (remote_name, branch_name)
132class RefsContainer:
133 """A container for refs."""
135 def __init__(self, logger=None) -> None:
136 self._logger = logger
138 def _log(
139 self,
140 ref,
141 old_sha,
142 new_sha,
143 committer=None,
144 timestamp=None,
145 timezone=None,
146 message=None,
147 ) -> None:
148 if self._logger is None:
149 return
150 if message is None:
151 return
152 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
154 def set_symbolic_ref(
155 self,
156 name,
157 other,
158 committer=None,
159 timestamp=None,
160 timezone=None,
161 message=None,
162 ) -> None:
163 """Make a ref point at another ref.
165 Args:
166 name: Name of the ref to set
167 other: Name of the ref to point at
168 message: Optional message
169 """
170 raise NotImplementedError(self.set_symbolic_ref)
172 def get_packed_refs(self) -> dict[Ref, ObjectID]:
173 """Get contents of the packed-refs file.
175 Returns: Dictionary mapping ref names to SHA1s
177 Note: Will return an empty dictionary when no packed-refs file is
178 present.
179 """
180 raise NotImplementedError(self.get_packed_refs)
182 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
183 """Add the given refs as packed refs.
185 Args:
186 new_refs: A mapping of ref names to targets; if a target is None that
187 means remove the ref
188 """
189 raise NotImplementedError(self.add_packed_refs)
191 def get_peeled(self, name) -> Optional[ObjectID]:
192 """Return the cached peeled value of a ref, if available.
194 Args:
195 name: Name of the ref to peel
196 Returns: The peeled value of the ref. If the ref is known not point to
197 a tag, this will be the SHA the ref refers to. If the ref may point
198 to a tag, but no cached information is available, None is returned.
199 """
200 return None
202 def import_refs(
203 self,
204 base: Ref,
205 other: dict[Ref, ObjectID],
206 committer: Optional[bytes] = None,
207 timestamp: Optional[bytes] = None,
208 timezone: Optional[bytes] = None,
209 message: Optional[bytes] = None,
210 prune: bool = False,
211 ) -> None:
212 if prune:
213 to_delete = set(self.subkeys(base))
214 else:
215 to_delete = set()
216 for name, value in other.items():
217 if value is None:
218 to_delete.add(name)
219 else:
220 self.set_if_equals(
221 b"/".join((base, name)), None, value, message=message
222 )
223 if to_delete:
224 try:
225 to_delete.remove(name)
226 except KeyError:
227 pass
228 for ref in to_delete:
229 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
231 def allkeys(self) -> Iterator[Ref]:
232 """All refs present in this container."""
233 raise NotImplementedError(self.allkeys)
235 def __iter__(self):
236 return iter(self.allkeys())
238 def keys(self, base=None):
239 """Refs present in this container.
241 Args:
242 base: An optional base to return refs under.
243 Returns: An unsorted set of valid refs in this container, including
244 packed refs.
245 """
246 if base is not None:
247 return self.subkeys(base)
248 else:
249 return self.allkeys()
251 def subkeys(self, base):
252 """Refs present in this container under a base.
254 Args:
255 base: The base to return refs under.
256 Returns: A set of valid refs in this container under the base; the base
257 prefix is stripped from the ref names returned.
258 """
259 keys = set()
260 base_len = len(base) + 1
261 for refname in self.allkeys():
262 if refname.startswith(base):
263 keys.add(refname[base_len:])
264 return keys
266 def as_dict(self, base=None) -> dict[Ref, ObjectID]:
267 """Return the contents of this container as a dictionary."""
268 ret = {}
269 keys = self.keys(base)
270 if base is None:
271 base = b""
272 else:
273 base = base.rstrip(b"/")
274 for key in keys:
275 try:
276 ret[key] = self[(base + b"/" + key).strip(b"/")]
277 except (SymrefLoop, KeyError):
278 continue # Unable to resolve
280 return ret
282 def _check_refname(self, name) -> None:
283 """Ensure a refname is valid and lives in refs or is HEAD.
285 HEAD is not a valid refname according to git-check-ref-format, but this
286 class needs to be able to touch HEAD. Also, check_ref_format expects
287 refnames without the leading 'refs/', but this class requires that
288 so it cannot touch anything outside the refs dir (or HEAD).
290 Args:
291 name: The name of the reference.
293 Raises:
294 KeyError: if a refname is not HEAD or is otherwise not valid.
295 """
296 if name in (HEADREF, b"refs/stash"):
297 return
298 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
299 raise RefFormatError(name)
301 def read_ref(self, refname):
302 """Read a reference without following any references.
304 Args:
305 refname: The name of the reference
306 Returns: The contents of the ref file, or None if it does
307 not exist.
308 """
309 contents = self.read_loose_ref(refname)
310 if not contents:
311 contents = self.get_packed_refs().get(refname, None)
312 return contents
314 def read_loose_ref(self, name) -> bytes:
315 """Read a loose reference and return its contents.
317 Args:
318 name: the refname to read
319 Returns: The contents of the ref file, or None if it does
320 not exist.
321 """
322 raise NotImplementedError(self.read_loose_ref)
324 def follow(self, name) -> tuple[list[bytes], bytes]:
325 """Follow a reference name.
327 Returns: a tuple of (refnames, sha), wheres refnames are the names of
328 references in the chain
329 """
330 contents = SYMREF + name
331 depth = 0
332 refnames = []
333 while contents.startswith(SYMREF):
334 refname = contents[len(SYMREF) :]
335 refnames.append(refname)
336 contents = self.read_ref(refname)
337 if not contents:
338 break
339 depth += 1
340 if depth > 5:
341 raise SymrefLoop(name, depth)
342 return refnames, contents
344 def __contains__(self, refname) -> bool:
345 if self.read_ref(refname):
346 return True
347 return False
349 def __getitem__(self, name) -> ObjectID:
350 """Get the SHA1 for a reference name.
352 This method follows all symbolic references.
353 """
354 _, sha = self.follow(name)
355 if sha is None:
356 raise KeyError(name)
357 return sha
359 def set_if_equals(
360 self,
361 name,
362 old_ref,
363 new_ref,
364 committer=None,
365 timestamp=None,
366 timezone=None,
367 message=None,
368 ) -> bool:
369 """Set a refname to new_ref only if it currently equals old_ref.
371 This method follows all symbolic references if applicable for the
372 subclass, and can be used to perform an atomic compare-and-swap
373 operation.
375 Args:
376 name: The refname to set.
377 old_ref: The old sha the refname must refer to, or None to set
378 unconditionally.
379 new_ref: The new sha the refname will refer to.
380 message: Message for reflog
381 Returns: True if the set was successful, False otherwise.
382 """
383 raise NotImplementedError(self.set_if_equals)
385 def add_if_new(
386 self, name, ref, committer=None, timestamp=None, timezone=None, message=None
387 ) -> bool:
388 """Add a new reference only if it does not already exist.
390 Args:
391 name: Ref name
392 ref: Ref value
393 """
394 raise NotImplementedError(self.add_if_new)
396 def __setitem__(self, name, ref) -> None:
397 """Set a reference name to point to the given SHA1.
399 This method follows all symbolic references if applicable for the
400 subclass.
402 Note: This method unconditionally overwrites the contents of a
403 reference. To update atomically only if the reference has not
404 changed, use set_if_equals().
406 Args:
407 name: The refname to set.
408 ref: The new sha the refname will refer to.
409 """
410 if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
411 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
412 self.set_if_equals(name, None, ref)
414 def remove_if_equals(
415 self,
416 name,
417 old_ref,
418 committer=None,
419 timestamp=None,
420 timezone=None,
421 message=None,
422 ) -> bool:
423 """Remove a refname only if it currently equals old_ref.
425 This method does not follow symbolic references, even if applicable for
426 the subclass. It can be used to perform an atomic compare-and-delete
427 operation.
429 Args:
430 name: The refname to delete.
431 old_ref: The old sha the refname must refer to, or None to
432 delete unconditionally.
433 message: Message for reflog
434 Returns: True if the delete was successful, False otherwise.
435 """
436 raise NotImplementedError(self.remove_if_equals)
438 def __delitem__(self, name) -> None:
439 """Remove a refname.
441 This method does not follow symbolic references, even if applicable for
442 the subclass.
444 Note: This method unconditionally deletes the contents of a reference.
445 To delete atomically only if the reference has not changed, use
446 remove_if_equals().
448 Args:
449 name: The refname to delete.
450 """
451 self.remove_if_equals(name, None)
453 def get_symrefs(self):
454 """Get a dict with all symrefs in this container.
456 Returns: Dictionary mapping source ref to target ref
457 """
458 ret = {}
459 for src in self.allkeys():
460 try:
461 dst = parse_symref_value(self.read_ref(src))
462 except ValueError:
463 pass
464 else:
465 ret[src] = dst
466 return ret
468 def pack_refs(self, all: bool = False) -> None:
469 """Pack loose refs into packed-refs file.
471 Args:
472 all: If True, pack all refs. If False, only pack tags.
473 """
474 raise NotImplementedError(self.pack_refs)
477class DictRefsContainer(RefsContainer):
478 """RefsContainer backed by a simple dict.
480 This container does not support symbolic or packed references and is not
481 threadsafe.
482 """
484 def __init__(self, refs, logger=None) -> None:
485 super().__init__(logger=logger)
486 self._refs = refs
487 self._peeled: dict[bytes, ObjectID] = {}
488 self._watchers: set[Any] = set()
490 def allkeys(self):
491 return self._refs.keys()
493 def read_loose_ref(self, name):
494 return self._refs.get(name, None)
496 def get_packed_refs(self):
497 return {}
499 def _notify(self, ref, newsha) -> None:
500 for watcher in self._watchers:
501 watcher._notify((ref, newsha))
503 def set_symbolic_ref(
504 self,
505 name: Ref,
506 other: Ref,
507 committer=None,
508 timestamp=None,
509 timezone=None,
510 message=None,
511 ) -> None:
512 old = self.follow(name)[-1]
513 new = SYMREF + other
514 self._refs[name] = new
515 self._notify(name, new)
516 self._log(
517 name,
518 old,
519 new,
520 committer=committer,
521 timestamp=timestamp,
522 timezone=timezone,
523 message=message,
524 )
526 def set_if_equals(
527 self,
528 name,
529 old_ref,
530 new_ref,
531 committer=None,
532 timestamp=None,
533 timezone=None,
534 message=None,
535 ) -> bool:
536 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
537 return False
538 # Only update the specific ref requested, not the whole chain
539 self._check_refname(name)
540 old = self._refs.get(name)
541 self._refs[name] = new_ref
542 self._notify(name, new_ref)
543 self._log(
544 name,
545 old,
546 new_ref,
547 committer=committer,
548 timestamp=timestamp,
549 timezone=timezone,
550 message=message,
551 )
552 return True
554 def add_if_new(
555 self,
556 name: Ref,
557 ref: ObjectID,
558 committer=None,
559 timestamp=None,
560 timezone=None,
561 message: Optional[bytes] = None,
562 ) -> bool:
563 if name in self._refs:
564 return False
565 self._refs[name] = ref
566 self._notify(name, ref)
567 self._log(
568 name,
569 None,
570 ref,
571 committer=committer,
572 timestamp=timestamp,
573 timezone=timezone,
574 message=message,
575 )
576 return True
578 def remove_if_equals(
579 self,
580 name,
581 old_ref,
582 committer=None,
583 timestamp=None,
584 timezone=None,
585 message=None,
586 ) -> bool:
587 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
588 return False
589 try:
590 old = self._refs.pop(name)
591 except KeyError:
592 pass
593 else:
594 self._notify(name, None)
595 self._log(
596 name,
597 old,
598 None,
599 committer=committer,
600 timestamp=timestamp,
601 timezone=timezone,
602 message=message,
603 )
604 return True
606 def get_peeled(self, name):
607 return self._peeled.get(name)
609 def _update(self, refs) -> None:
610 """Update multiple refs; intended only for testing."""
611 # TODO(dborowitz): replace this with a public function that uses
612 # set_if_equal.
613 for ref, sha in refs.items():
614 self.set_if_equals(ref, None, sha)
616 def _update_peeled(self, peeled) -> None:
617 """Update cached peeled refs; intended only for testing."""
618 self._peeled.update(peeled)
621class InfoRefsContainer(RefsContainer):
622 """Refs container that reads refs from a info/refs file."""
624 def __init__(self, f) -> None:
625 self._refs = {}
626 self._peeled = {}
627 refs = read_info_refs(f)
628 (self._refs, self._peeled) = split_peeled_refs(refs)
630 def allkeys(self):
631 return self._refs.keys()
633 def read_loose_ref(self, name):
634 return self._refs.get(name, None)
636 def get_packed_refs(self):
637 return {}
639 def get_peeled(self, name):
640 try:
641 return self._peeled[name]
642 except KeyError:
643 return self._refs[name]
646class DiskRefsContainer(RefsContainer):
647 """Refs container that reads refs from disk."""
649 def __init__(
650 self,
651 path: Union[str, bytes, os.PathLike],
652 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
653 logger=None,
654 ) -> None:
655 super().__init__(logger=logger)
656 # Convert path-like objects to strings, then to bytes for Git compatibility
657 self.path = os.fsencode(os.fspath(path))
658 if worktree_path is None:
659 self.worktree_path = self.path
660 else:
661 self.worktree_path = os.fsencode(os.fspath(worktree_path))
662 self._packed_refs = None
663 self._peeled_refs = None
665 def __repr__(self) -> str:
666 return f"{self.__class__.__name__}({self.path!r})"
668 def subkeys(self, base):
669 subkeys = set()
670 path = self.refpath(base)
671 for root, unused_dirs, files in os.walk(path):
672 dir = root[len(path) :]
673 if os.path.sep != "/":
674 dir = dir.replace(os.fsencode(os.path.sep), b"/")
675 dir = dir.strip(b"/")
676 for filename in files:
677 refname = b"/".join(([dir] if dir else []) + [filename])
678 # check_ref_format requires at least one /, so we prepend the
679 # base before calling it.
680 if check_ref_format(base + b"/" + refname):
681 subkeys.add(refname)
682 for key in self.get_packed_refs():
683 if key.startswith(base):
684 subkeys.add(key[len(base) :].strip(b"/"))
685 return subkeys
687 def allkeys(self):
688 allkeys = set()
689 if os.path.exists(self.refpath(HEADREF)):
690 allkeys.add(HEADREF)
691 path = self.refpath(b"")
692 refspath = self.refpath(b"refs")
693 for root, unused_dirs, files in os.walk(refspath):
694 dir = root[len(path) :]
695 if os.path.sep != "/":
696 dir = dir.replace(os.fsencode(os.path.sep), b"/")
697 for filename in files:
698 refname = b"/".join([dir, filename])
699 if check_ref_format(refname):
700 allkeys.add(refname)
701 allkeys.update(self.get_packed_refs())
702 return allkeys
704 def refpath(self, name):
705 """Return the disk path of a ref."""
706 if os.path.sep != "/":
707 name = name.replace(b"/", os.fsencode(os.path.sep))
708 # TODO: as the 'HEAD' reference is working tree specific, it
709 # should actually not be a part of RefsContainer
710 if name == HEADREF:
711 return os.path.join(self.worktree_path, name)
712 else:
713 return os.path.join(self.path, name)
715 def get_packed_refs(self):
716 """Get contents of the packed-refs file.
718 Returns: Dictionary mapping ref names to SHA1s
720 Note: Will return an empty dictionary when no packed-refs file is
721 present.
722 """
723 # TODO: invalidate the cache on repacking
724 if self._packed_refs is None:
725 # set both to empty because we want _peeled_refs to be
726 # None if and only if _packed_refs is also None.
727 self._packed_refs = {}
728 self._peeled_refs = {}
729 path = os.path.join(self.path, b"packed-refs")
730 try:
731 f = GitFile(path, "rb")
732 except FileNotFoundError:
733 return {}
734 with f:
735 first_line = next(iter(f)).rstrip()
736 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
737 for sha, name, peeled in read_packed_refs_with_peeled(f):
738 self._packed_refs[name] = sha
739 if peeled:
740 self._peeled_refs[name] = peeled
741 else:
742 f.seek(0)
743 for sha, name in read_packed_refs(f):
744 self._packed_refs[name] = sha
745 return self._packed_refs
747 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
748 """Add the given refs as packed refs.
750 Args:
751 new_refs: A mapping of ref names to targets; if a target is None that
752 means remove the ref
753 """
754 if not new_refs:
755 return
757 path = os.path.join(self.path, b"packed-refs")
759 with GitFile(path, "wb") as f:
760 # reread cached refs from disk, while holding the lock
761 packed_refs = self.get_packed_refs().copy()
763 for ref, target in new_refs.items():
764 # sanity check
765 if ref == HEADREF:
766 raise ValueError("cannot pack HEAD")
768 # remove any loose refs pointing to this one -- please
769 # note that this bypasses remove_if_equals as we don't
770 # want to affect packed refs in here
771 with suppress(OSError):
772 os.remove(self.refpath(ref))
774 if target is not None:
775 packed_refs[ref] = target
776 else:
777 packed_refs.pop(ref, None)
779 write_packed_refs(f, packed_refs, self._peeled_refs)
781 self._packed_refs = packed_refs
783 def get_peeled(self, name):
784 """Return the cached peeled value of a ref, if available.
786 Args:
787 name: Name of the ref to peel
788 Returns: The peeled value of the ref. If the ref is known not point to
789 a tag, this will be the SHA the ref refers to. If the ref may point
790 to a tag, but no cached information is available, None is returned.
791 """
792 self.get_packed_refs()
793 if self._peeled_refs is None or name not in self._packed_refs:
794 # No cache: no peeled refs were read, or this ref is loose
795 return None
796 if name in self._peeled_refs:
797 return self._peeled_refs[name]
798 else:
799 # Known not peelable
800 return self[name]
802 def read_loose_ref(self, name):
803 """Read a reference file and return its contents.
805 If the reference file a symbolic reference, only read the first line of
806 the file. Otherwise, only read the first 40 bytes.
808 Args:
809 name: the refname to read, relative to refpath
810 Returns: The contents of the ref file, or None if the file does not
811 exist.
813 Raises:
814 IOError: if any other error occurs
815 """
816 filename = self.refpath(name)
817 try:
818 with GitFile(filename, "rb") as f:
819 header = f.read(len(SYMREF))
820 if header == SYMREF:
821 # Read only the first line
822 return header + next(iter(f)).rstrip(b"\r\n")
823 else:
824 # Read only the first 40 bytes
825 return header + f.read(40 - len(SYMREF))
826 except (OSError, UnicodeError):
827 # don't assume anything specific about the error; in
828 # particular, invalid or forbidden paths can raise weird
829 # errors depending on the specific operating system
830 return None
832 def _remove_packed_ref(self, name) -> None:
833 if self._packed_refs is None:
834 return
835 filename = os.path.join(self.path, b"packed-refs")
836 # reread cached refs from disk, while holding the lock
837 f = GitFile(filename, "wb")
838 try:
839 self._packed_refs = None
840 self.get_packed_refs()
842 if name not in self._packed_refs:
843 return
845 del self._packed_refs[name]
846 with suppress(KeyError):
847 del self._peeled_refs[name]
848 write_packed_refs(f, self._packed_refs, self._peeled_refs)
849 f.close()
850 finally:
851 f.abort()
853 def set_symbolic_ref(
854 self,
855 name,
856 other,
857 committer=None,
858 timestamp=None,
859 timezone=None,
860 message=None,
861 ) -> None:
862 """Make a ref point at another ref.
864 Args:
865 name: Name of the ref to set
866 other: Name of the ref to point at
867 message: Optional message to describe the change
868 """
869 self._check_refname(name)
870 self._check_refname(other)
871 filename = self.refpath(name)
872 f = GitFile(filename, "wb")
873 try:
874 f.write(SYMREF + other + b"\n")
875 sha = self.follow(name)[-1]
876 self._log(
877 name,
878 sha,
879 sha,
880 committer=committer,
881 timestamp=timestamp,
882 timezone=timezone,
883 message=message,
884 )
885 except BaseException:
886 f.abort()
887 raise
888 else:
889 f.close()
891 def set_if_equals(
892 self,
893 name,
894 old_ref,
895 new_ref,
896 committer=None,
897 timestamp=None,
898 timezone=None,
899 message=None,
900 ) -> bool:
901 """Set a refname to new_ref only if it currently equals old_ref.
903 This method follows all symbolic references, and can be used to perform
904 an atomic compare-and-swap operation.
906 Args:
907 name: The refname to set.
908 old_ref: The old sha the refname must refer to, or None to set
909 unconditionally.
910 new_ref: The new sha the refname will refer to.
911 message: Set message for reflog
912 Returns: True if the set was successful, False otherwise.
913 """
914 self._check_refname(name)
915 try:
916 realnames, _ = self.follow(name)
917 realname = realnames[-1]
918 except (KeyError, IndexError, SymrefLoop):
919 realname = name
920 filename = self.refpath(realname)
922 # make sure none of the ancestor folders is in packed refs
923 probe_ref = os.path.dirname(realname)
924 packed_refs = self.get_packed_refs()
925 while probe_ref:
926 if packed_refs.get(probe_ref, None) is not None:
927 raise NotADirectoryError(filename)
928 probe_ref = os.path.dirname(probe_ref)
930 ensure_dir_exists(os.path.dirname(filename))
931 with GitFile(filename, "wb") as f:
932 if old_ref is not None:
933 try:
934 # read again while holding the lock to handle race conditions
935 orig_ref = self.read_loose_ref(realname)
936 if orig_ref is None:
937 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
938 if orig_ref != old_ref:
939 f.abort()
940 return False
941 except OSError:
942 f.abort()
943 raise
945 # Check if ref already has the desired value while holding the lock
946 # This avoids fsync when ref is unchanged but still detects lock conflicts
947 current_ref = self.read_loose_ref(realname)
948 if current_ref is None:
949 current_ref = packed_refs.get(realname, None)
951 if current_ref is not None and current_ref == new_ref:
952 # Ref already has desired value, abort write to avoid fsync
953 f.abort()
954 return True
956 try:
957 f.write(new_ref + b"\n")
958 except OSError:
959 f.abort()
960 raise
961 self._log(
962 realname,
963 old_ref,
964 new_ref,
965 committer=committer,
966 timestamp=timestamp,
967 timezone=timezone,
968 message=message,
969 )
970 return True
972 def add_if_new(
973 self,
974 name: bytes,
975 ref: bytes,
976 committer=None,
977 timestamp=None,
978 timezone=None,
979 message: Optional[bytes] = None,
980 ) -> bool:
981 """Add a new reference only if it does not already exist.
983 This method follows symrefs, and only ensures that the last ref in the
984 chain does not exist.
986 Args:
987 name: The refname to set.
988 ref: The new sha the refname will refer to.
989 message: Optional message for reflog
990 Returns: True if the add was successful, False otherwise.
991 """
992 try:
993 realnames, contents = self.follow(name)
994 if contents is not None:
995 return False
996 realname = realnames[-1]
997 except (KeyError, IndexError):
998 realname = name
999 self._check_refname(realname)
1000 filename = self.refpath(realname)
1001 ensure_dir_exists(os.path.dirname(filename))
1002 with GitFile(filename, "wb") as f:
1003 if os.path.exists(filename) or name in self.get_packed_refs():
1004 f.abort()
1005 return False
1006 try:
1007 f.write(ref + b"\n")
1008 except OSError:
1009 f.abort()
1010 raise
1011 else:
1012 self._log(
1013 name,
1014 None,
1015 ref,
1016 committer=committer,
1017 timestamp=timestamp,
1018 timezone=timezone,
1019 message=message,
1020 )
1021 return True
1023 def remove_if_equals(
1024 self,
1025 name,
1026 old_ref,
1027 committer=None,
1028 timestamp=None,
1029 timezone=None,
1030 message=None,
1031 ) -> bool:
1032 """Remove a refname only if it currently equals old_ref.
1034 This method does not follow symbolic references. It can be used to
1035 perform an atomic compare-and-delete operation.
1037 Args:
1038 name: The refname to delete.
1039 old_ref: The old sha the refname must refer to, or None to
1040 delete unconditionally.
1041 message: Optional message
1042 Returns: True if the delete was successful, False otherwise.
1043 """
1044 self._check_refname(name)
1045 filename = self.refpath(name)
1046 ensure_dir_exists(os.path.dirname(filename))
1047 f = GitFile(filename, "wb")
1048 try:
1049 if old_ref is not None:
1050 orig_ref = self.read_loose_ref(name)
1051 if orig_ref is None:
1052 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1053 if orig_ref != old_ref:
1054 return False
1056 # remove the reference file itself
1057 try:
1058 found = os.path.lexists(filename)
1059 except OSError:
1060 # may only be packed, or otherwise unstorable
1061 found = False
1063 if found:
1064 os.remove(filename)
1066 self._remove_packed_ref(name)
1067 self._log(
1068 name,
1069 old_ref,
1070 None,
1071 committer=committer,
1072 timestamp=timestamp,
1073 timezone=timezone,
1074 message=message,
1075 )
1076 finally:
1077 # never write, we just wanted the lock
1078 f.abort()
1080 # outside of the lock, clean-up any parent directory that might now
1081 # be empty. this ensures that re-creating a reference of the same
1082 # name of what was previously a directory works as expected
1083 parent = name
1084 while True:
1085 try:
1086 parent, _ = parent.rsplit(b"/", 1)
1087 except ValueError:
1088 break
1090 if parent == b"refs":
1091 break
1092 parent_filename = self.refpath(parent)
1093 try:
1094 os.rmdir(parent_filename)
1095 except OSError:
1096 # this can be caused by the parent directory being
1097 # removed by another process, being not empty, etc.
1098 # in any case, this is non fatal because we already
1099 # removed the reference, just ignore it
1100 break
1102 return True
1104 def pack_refs(self, all: bool = False) -> None:
1105 """Pack loose refs into packed-refs file.
1107 Args:
1108 all: If True, pack all refs. If False, only pack tags.
1109 """
1110 refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
1111 for ref in self.allkeys():
1112 if ref == HEADREF:
1113 # Never pack HEAD
1114 continue
1115 if all or ref.startswith(LOCAL_TAG_PREFIX):
1116 try:
1117 sha = self[ref]
1118 if sha:
1119 refs_to_pack[ref] = sha
1120 except KeyError:
1121 # Broken ref, skip it
1122 pass
1124 if refs_to_pack:
1125 self.add_packed_refs(refs_to_pack)
1128def _split_ref_line(line):
1129 """Split a single ref line into a tuple of SHA1 and name."""
1130 fields = line.rstrip(b"\n\r").split(b" ")
1131 if len(fields) != 2:
1132 raise PackedRefsException(f"invalid ref line {line!r}")
1133 sha, name = fields
1134 if not valid_hexsha(sha):
1135 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1136 if not check_ref_format(name):
1137 raise PackedRefsException(f"invalid ref name {name!r}")
1138 return (sha, name)
1141def read_packed_refs(f):
1142 """Read a packed refs file.
1144 Args:
1145 f: file-like object to read from
1146 Returns: Iterator over tuples with SHA1s and ref names.
1147 """
1148 for line in f:
1149 if line.startswith(b"#"):
1150 # Comment
1151 continue
1152 if line.startswith(b"^"):
1153 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1154 yield _split_ref_line(line)
1157def read_packed_refs_with_peeled(f):
1158 """Read a packed refs file including peeled refs.
1160 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1161 with ref names, SHA1s, and peeled SHA1s (or None).
1163 Args:
1164 f: file-like object to read from, seek'ed to the second line
1165 """
1166 last = None
1167 for line in f:
1168 if line[0] == b"#":
1169 continue
1170 line = line.rstrip(b"\r\n")
1171 if line.startswith(b"^"):
1172 if not last:
1173 raise PackedRefsException("unexpected peeled ref line")
1174 if not valid_hexsha(line[1:]):
1175 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1176 sha, name = _split_ref_line(last)
1177 last = None
1178 yield (sha, name, line[1:])
1179 else:
1180 if last:
1181 sha, name = _split_ref_line(last)
1182 yield (sha, name, None)
1183 last = line
1184 if last:
1185 sha, name = _split_ref_line(last)
1186 yield (sha, name, None)
1189def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
1190 """Write a packed refs file.
1192 Args:
1193 f: empty file-like object to write to
1194 packed_refs: dict of refname to sha of packed refs to write
1195 peeled_refs: dict of refname to peeled value of sha
1196 """
1197 if peeled_refs is None:
1198 peeled_refs = {}
1199 else:
1200 f.write(b"# pack-refs with: peeled\n")
1201 for refname in sorted(packed_refs.keys()):
1202 f.write(git_line(packed_refs[refname], refname))
1203 if refname in peeled_refs:
1204 f.write(b"^" + peeled_refs[refname] + b"\n")
1207def read_info_refs(f):
1208 ret = {}
1209 for line in f.readlines():
1210 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1211 ret[name] = sha
1212 return ret
1215def write_info_refs(refs, store: ObjectContainer):
1216 """Generate info refs."""
1217 # TODO: Avoid recursive import :(
1218 from .object_store import peel_sha
1220 for name, sha in sorted(refs.items()):
1221 # get_refs() includes HEAD as a special case, but we don't want to
1222 # advertise it
1223 if name == HEADREF:
1224 continue
1225 try:
1226 o = store[sha]
1227 except KeyError:
1228 continue
1229 unpeeled, peeled = peel_sha(store, sha)
1230 yield o.id + b"\t" + name + b"\n"
1231 if o.id != peeled.id:
1232 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1235def is_local_branch(x):
1236 return x.startswith(LOCAL_BRANCH_PREFIX)
1239def strip_peeled_refs(refs):
1240 """Remove all peeled refs."""
1241 return {
1242 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1243 }
1246def split_peeled_refs(refs):
1247 """Split peeled refs from regular refs."""
1248 peeled = {}
1249 regular = {}
1250 for ref, sha in refs.items():
1251 if ref.endswith(PEELED_TAG_SUFFIX):
1252 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
1253 else:
1254 regular[ref] = sha
1255 return regular, peeled
1258def _set_origin_head(refs, origin, origin_head) -> None:
1259 # set refs/remotes/origin/HEAD
1260 origin_base = b"refs/remotes/" + origin + b"/"
1261 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1262 origin_ref = origin_base + HEADREF
1263 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1264 if target_ref in refs:
1265 refs.set_symbolic_ref(origin_ref, target_ref)
1268def _set_default_branch(
1269 refs: RefsContainer,
1270 origin: bytes,
1271 origin_head: Optional[bytes],
1272 branch: bytes,
1273 ref_message: Optional[bytes],
1274) -> bytes:
1275 """Set the default branch."""
1276 origin_base = b"refs/remotes/" + origin + b"/"
1277 if branch:
1278 origin_ref = origin_base + branch
1279 if origin_ref in refs:
1280 local_ref = LOCAL_BRANCH_PREFIX + branch
1281 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1282 head_ref = local_ref
1283 elif LOCAL_TAG_PREFIX + branch in refs:
1284 head_ref = LOCAL_TAG_PREFIX + branch
1285 else:
1286 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1287 elif origin_head:
1288 head_ref = origin_head
1289 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1290 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1291 else:
1292 origin_ref = origin_head
1293 try:
1294 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1295 except KeyError:
1296 pass
1297 else:
1298 raise ValueError("neither origin_head nor branch are provided")
1299 return head_ref
1302def _set_head(refs, head_ref, ref_message):
1303 if head_ref.startswith(LOCAL_TAG_PREFIX):
1304 # detach HEAD at specified tag
1305 head = refs[head_ref]
1306 if isinstance(head, Tag):
1307 _cls, obj = head.object
1308 head = obj.get_object(obj).id
1309 del refs[HEADREF]
1310 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1311 else:
1312 # set HEAD to specific branch
1313 try:
1314 head = refs[head_ref]
1315 refs.set_symbolic_ref(HEADREF, head_ref)
1316 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1317 except KeyError:
1318 head = None
1319 return head
1322def _import_remote_refs(
1323 refs_container: RefsContainer,
1324 remote_name: str,
1325 refs: dict[str, str],
1326 message: Optional[bytes] = None,
1327 prune: bool = False,
1328 prune_tags: bool = False,
1329) -> None:
1330 stripped_refs = strip_peeled_refs(refs)
1331 branches = {
1332 n[len(LOCAL_BRANCH_PREFIX) :]: v
1333 for (n, v) in stripped_refs.items()
1334 if n.startswith(LOCAL_BRANCH_PREFIX)
1335 }
1336 refs_container.import_refs(
1337 b"refs/remotes/" + remote_name.encode(),
1338 branches,
1339 message=message,
1340 prune=prune,
1341 )
1342 tags = {
1343 n[len(LOCAL_TAG_PREFIX) :]: v
1344 for (n, v) in stripped_refs.items()
1345 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1346 }
1347 refs_container.import_refs(
1348 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1349 )
1352def serialize_refs(store, refs):
1353 # TODO: Avoid recursive import :(
1354 from .object_store import peel_sha
1356 ret = {}
1357 for ref, sha in refs.items():
1358 try:
1359 unpeeled, peeled = peel_sha(store, sha)
1360 except KeyError:
1361 warnings.warn(
1362 "ref {} points at non-present sha {}".format(
1363 ref.decode("utf-8", "replace"), sha.decode("ascii")
1364 ),
1365 UserWarning,
1366 )
1367 continue
1368 else:
1369 if isinstance(unpeeled, Tag):
1370 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1371 ret[ref] = unpeeled.id
1372 return ret