Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/refs.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# refs.py -- For dealing with git refs
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5# General Public License as public by the Free Software Foundation; version 2.0
6# or (at your option) any later version. You can redistribute it and/or
7# modify it under the terms of either of these two licenses.
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# You should have received a copy of the licenses; if not, see
16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18# License, Version 2.0.
19#
22"""Ref handling."""
24import os
25import warnings
26from contextlib import suppress
27from typing import Any, Dict, Optional, Set
29from .errors import PackedRefsException, RefFormatError
30from .file import GitFile, ensure_dir_exists
31from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
32from .pack import ObjectContainer
34Ref = bytes
36HEADREF = b"HEAD"
37SYMREF = b"ref: "
38LOCAL_BRANCH_PREFIX = b"refs/heads/"
39LOCAL_TAG_PREFIX = b"refs/tags/"
40LOCAL_REMOTE_PREFIX = b"refs/remotes/"
41BAD_REF_CHARS = set(b"\177 ~^:?*[")
42PEELED_TAG_SUFFIX = b"^{}"
44# For backwards compatibility
45ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
48class SymrefLoop(Exception):
49 """There is a loop between one or more symrefs."""
51 def __init__(self, ref, depth) -> None:
52 self.ref = ref
53 self.depth = depth
56def parse_symref_value(contents):
57 """Parse a symref value.
59 Args:
60 contents: Contents to parse
61 Returns: Destination
62 """
63 if contents.startswith(SYMREF):
64 return contents[len(SYMREF) :].rstrip(b"\r\n")
65 raise ValueError(contents)
68def check_ref_format(refname: Ref):
69 """Check if a refname is correctly formatted.
71 Implements all the same rules as git-check-ref-format[1].
73 [1]
74 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
76 Args:
77 refname: The refname to check
78 Returns: True if refname is valid, False otherwise
79 """
80 # These could be combined into one big expression, but are listed
81 # separately to parallel [1].
82 if b"/." in refname or refname.startswith(b"."):
83 return False
84 if b"/" not in refname:
85 return False
86 if b".." in refname:
87 return False
88 for i, c in enumerate(refname):
89 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
90 return False
91 if refname[-1] in b"/.":
92 return False
93 if refname.endswith(b".lock"):
94 return False
95 if b"@{" in refname:
96 return False
97 if b"\\" in refname:
98 return False
99 return True
102class RefsContainer:
103 """A container for refs."""
105 def __init__(self, logger=None) -> None:
106 self._logger = logger
108 def _log(
109 self,
110 ref,
111 old_sha,
112 new_sha,
113 committer=None,
114 timestamp=None,
115 timezone=None,
116 message=None,
117 ):
118 if self._logger is None:
119 return
120 if message is None:
121 return
122 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
124 def set_symbolic_ref(
125 self,
126 name,
127 other,
128 committer=None,
129 timestamp=None,
130 timezone=None,
131 message=None,
132 ):
133 """Make a ref point at another ref.
135 Args:
136 name: Name of the ref to set
137 other: Name of the ref to point at
138 message: Optional message
139 """
140 raise NotImplementedError(self.set_symbolic_ref)
142 def get_packed_refs(self):
143 """Get contents of the packed-refs file.
145 Returns: Dictionary mapping ref names to SHA1s
147 Note: Will return an empty dictionary when no packed-refs file is
148 present.
149 """
150 raise NotImplementedError(self.get_packed_refs)
152 def add_packed_refs(self, new_refs: Dict[Ref, Optional[ObjectID]]):
153 """Add the given refs as packed refs.
155 Args:
156 new_refs: A mapping of ref names to targets; if a target is None that
157 means remove the ref
158 """
159 raise NotImplementedError(self.add_packed_refs)
161 def get_peeled(self, name):
162 """Return the cached peeled value of a ref, if available.
164 Args:
165 name: Name of the ref to peel
166 Returns: The peeled value of the ref. If the ref is known not point to
167 a tag, this will be the SHA the ref refers to. If the ref may point
168 to a tag, but no cached information is available, None is returned.
169 """
170 return None
172 def import_refs(
173 self,
174 base: Ref,
175 other: Dict[Ref, ObjectID],
176 committer: Optional[bytes] = None,
177 timestamp: Optional[bytes] = None,
178 timezone: Optional[bytes] = None,
179 message: Optional[bytes] = None,
180 prune: bool = False,
181 ):
182 if prune:
183 to_delete = set(self.subkeys(base))
184 else:
185 to_delete = set()
186 for name, value in other.items():
187 if value is None:
188 to_delete.add(name)
189 else:
190 self.set_if_equals(
191 b"/".join((base, name)), None, value, message=message
192 )
193 if to_delete:
194 try:
195 to_delete.remove(name)
196 except KeyError:
197 pass
198 for ref in to_delete:
199 self.remove_if_equals(b"/".join((base, ref)), None, message=message)
201 def allkeys(self):
202 """All refs present in this container."""
203 raise NotImplementedError(self.allkeys)
205 def __iter__(self):
206 return iter(self.allkeys())
208 def keys(self, base=None):
209 """Refs present in this container.
211 Args:
212 base: An optional base to return refs under.
213 Returns: An unsorted set of valid refs in this container, including
214 packed refs.
215 """
216 if base is not None:
217 return self.subkeys(base)
218 else:
219 return self.allkeys()
221 def subkeys(self, base):
222 """Refs present in this container under a base.
224 Args:
225 base: The base to return refs under.
226 Returns: A set of valid refs in this container under the base; the base
227 prefix is stripped from the ref names returned.
228 """
229 keys = set()
230 base_len = len(base) + 1
231 for refname in self.allkeys():
232 if refname.startswith(base):
233 keys.add(refname[base_len:])
234 return keys
236 def as_dict(self, base=None):
237 """Return the contents of this container as a dictionary."""
238 ret = {}
239 keys = self.keys(base)
240 if base is None:
241 base = b""
242 else:
243 base = base.rstrip(b"/")
244 for key in keys:
245 try:
246 ret[key] = self[(base + b"/" + key).strip(b"/")]
247 except (SymrefLoop, KeyError):
248 continue # Unable to resolve
250 return ret
252 def _check_refname(self, name):
253 """Ensure a refname is valid and lives in refs or is HEAD.
255 HEAD is not a valid refname according to git-check-ref-format, but this
256 class needs to be able to touch HEAD. Also, check_ref_format expects
257 refnames without the leading 'refs/', but this class requires that
258 so it cannot touch anything outside the refs dir (or HEAD).
260 Args:
261 name: The name of the reference.
263 Raises:
264 KeyError: if a refname is not HEAD or is otherwise not valid.
265 """
266 if name in (HEADREF, b"refs/stash"):
267 return
268 if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
269 raise RefFormatError(name)
271 def read_ref(self, refname):
272 """Read a reference without following any references.
274 Args:
275 refname: The name of the reference
276 Returns: The contents of the ref file, or None if it does
277 not exist.
278 """
279 contents = self.read_loose_ref(refname)
280 if not contents:
281 contents = self.get_packed_refs().get(refname, None)
282 return contents
284 def read_loose_ref(self, name):
285 """Read a loose reference and return its contents.
287 Args:
288 name: the refname to read
289 Returns: The contents of the ref file, or None if it does
290 not exist.
291 """
292 raise NotImplementedError(self.read_loose_ref)
294 def follow(self, name):
295 """Follow a reference name.
297 Returns: a tuple of (refnames, sha), wheres refnames are the names of
298 references in the chain
299 """
300 contents = SYMREF + name
301 depth = 0
302 refnames = []
303 while contents.startswith(SYMREF):
304 refname = contents[len(SYMREF) :]
305 refnames.append(refname)
306 contents = self.read_ref(refname)
307 if not contents:
308 break
309 depth += 1
310 if depth > 5:
311 raise SymrefLoop(name, depth)
312 return refnames, contents
314 def __contains__(self, refname) -> bool:
315 if self.read_ref(refname):
316 return True
317 return False
319 def __getitem__(self, name):
320 """Get the SHA1 for a reference name.
322 This method follows all symbolic references.
323 """
324 _, sha = self.follow(name)
325 if sha is None:
326 raise KeyError(name)
327 return sha
329 def set_if_equals(
330 self,
331 name,
332 old_ref,
333 new_ref,
334 committer=None,
335 timestamp=None,
336 timezone=None,
337 message=None,
338 ):
339 """Set a refname to new_ref only if it currently equals old_ref.
341 This method follows all symbolic references if applicable for the
342 subclass, and can be used to perform an atomic compare-and-swap
343 operation.
345 Args:
346 name: The refname to set.
347 old_ref: The old sha the refname must refer to, or None to set
348 unconditionally.
349 new_ref: The new sha the refname will refer to.
350 message: Message for reflog
351 Returns: True if the set was successful, False otherwise.
352 """
353 raise NotImplementedError(self.set_if_equals)
355 def add_if_new(
356 self, name, ref, committer=None, timestamp=None, timezone=None, message=None
357 ):
358 """Add a new reference only if it does not already exist.
360 Args:
361 name: Ref name
362 ref: Ref value
363 """
364 raise NotImplementedError(self.add_if_new)
366 def __setitem__(self, name, ref) -> None:
367 """Set a reference name to point to the given SHA1.
369 This method follows all symbolic references if applicable for the
370 subclass.
372 Note: This method unconditionally overwrites the contents of a
373 reference. To update atomically only if the reference has not
374 changed, use set_if_equals().
376 Args:
377 name: The refname to set.
378 ref: The new sha the refname will refer to.
379 """
380 self.set_if_equals(name, None, ref)
382 def remove_if_equals(
383 self,
384 name,
385 old_ref,
386 committer=None,
387 timestamp=None,
388 timezone=None,
389 message=None,
390 ):
391 """Remove a refname only if it currently equals old_ref.
393 This method does not follow symbolic references, even if applicable for
394 the subclass. It can be used to perform an atomic compare-and-delete
395 operation.
397 Args:
398 name: The refname to delete.
399 old_ref: The old sha the refname must refer to, or None to
400 delete unconditionally.
401 message: Message for reflog
402 Returns: True if the delete was successful, False otherwise.
403 """
404 raise NotImplementedError(self.remove_if_equals)
406 def __delitem__(self, name) -> None:
407 """Remove a refname.
409 This method does not follow symbolic references, even if applicable for
410 the subclass.
412 Note: This method unconditionally deletes the contents of a reference.
413 To delete atomically only if the reference has not changed, use
414 remove_if_equals().
416 Args:
417 name: The refname to delete.
418 """
419 self.remove_if_equals(name, None)
421 def get_symrefs(self):
422 """Get a dict with all symrefs in this container.
424 Returns: Dictionary mapping source ref to target ref
425 """
426 ret = {}
427 for src in self.allkeys():
428 try:
429 dst = parse_symref_value(self.read_ref(src))
430 except ValueError:
431 pass
432 else:
433 ret[src] = dst
434 return ret
437class DictRefsContainer(RefsContainer):
438 """RefsContainer backed by a simple dict.
440 This container does not support symbolic or packed references and is not
441 threadsafe.
442 """
444 def __init__(self, refs, logger=None) -> None:
445 super().__init__(logger=logger)
446 self._refs = refs
447 self._peeled: Dict[bytes, ObjectID] = {}
448 self._watchers: Set[Any] = set()
450 def allkeys(self):
451 return self._refs.keys()
453 def read_loose_ref(self, name):
454 return self._refs.get(name, None)
456 def get_packed_refs(self):
457 return {}
459 def _notify(self, ref, newsha):
460 for watcher in self._watchers:
461 watcher._notify((ref, newsha))
463 def set_symbolic_ref(
464 self,
465 name: Ref,
466 other: Ref,
467 committer=None,
468 timestamp=None,
469 timezone=None,
470 message=None,
471 ):
472 old = self.follow(name)[-1]
473 new = SYMREF + other
474 self._refs[name] = new
475 self._notify(name, new)
476 self._log(
477 name,
478 old,
479 new,
480 committer=committer,
481 timestamp=timestamp,
482 timezone=timezone,
483 message=message,
484 )
486 def set_if_equals(
487 self,
488 name,
489 old_ref,
490 new_ref,
491 committer=None,
492 timestamp=None,
493 timezone=None,
494 message=None,
495 ):
496 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
497 return False
498 realnames, _ = self.follow(name)
499 for realname in realnames:
500 self._check_refname(realname)
501 old = self._refs.get(realname)
502 self._refs[realname] = new_ref
503 self._notify(realname, new_ref)
504 self._log(
505 realname,
506 old,
507 new_ref,
508 committer=committer,
509 timestamp=timestamp,
510 timezone=timezone,
511 message=message,
512 )
513 return True
515 def add_if_new(
516 self,
517 name: Ref,
518 ref: ObjectID,
519 committer=None,
520 timestamp=None,
521 timezone=None,
522 message: Optional[bytes] = None,
523 ):
524 if name in self._refs:
525 return False
526 self._refs[name] = ref
527 self._notify(name, ref)
528 self._log(
529 name,
530 None,
531 ref,
532 committer=committer,
533 timestamp=timestamp,
534 timezone=timezone,
535 message=message,
536 )
537 return True
539 def remove_if_equals(
540 self,
541 name,
542 old_ref,
543 committer=None,
544 timestamp=None,
545 timezone=None,
546 message=None,
547 ):
548 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
549 return False
550 try:
551 old = self._refs.pop(name)
552 except KeyError:
553 pass
554 else:
555 self._notify(name, None)
556 self._log(
557 name,
558 old,
559 None,
560 committer=committer,
561 timestamp=timestamp,
562 timezone=timezone,
563 message=message,
564 )
565 return True
567 def get_peeled(self, name):
568 return self._peeled.get(name)
570 def _update(self, refs):
571 """Update multiple refs; intended only for testing."""
572 # TODO(dborowitz): replace this with a public function that uses
573 # set_if_equal.
574 for ref, sha in refs.items():
575 self.set_if_equals(ref, None, sha)
577 def _update_peeled(self, peeled):
578 """Update cached peeled refs; intended only for testing."""
579 self._peeled.update(peeled)
582class InfoRefsContainer(RefsContainer):
583 """Refs container that reads refs from a info/refs file."""
585 def __init__(self, f) -> None:
586 self._refs = {}
587 self._peeled = {}
588 for line in f.readlines():
589 sha, name = line.rstrip(b"\n").split(b"\t")
590 if name.endswith(PEELED_TAG_SUFFIX):
591 name = name[:-3]
592 if not check_ref_format(name):
593 raise ValueError(f"invalid ref name {name!r}")
594 self._peeled[name] = sha
595 else:
596 if not check_ref_format(name):
597 raise ValueError(f"invalid ref name {name!r}")
598 self._refs[name] = sha
600 def allkeys(self):
601 return self._refs.keys()
603 def read_loose_ref(self, name):
604 return self._refs.get(name, None)
606 def get_packed_refs(self):
607 return {}
609 def get_peeled(self, name):
610 try:
611 return self._peeled[name]
612 except KeyError:
613 return self._refs[name]
616class DiskRefsContainer(RefsContainer):
617 """Refs container that reads refs from disk."""
619 def __init__(self, path, worktree_path=None, logger=None) -> None:
620 super().__init__(logger=logger)
621 if getattr(path, "encode", None) is not None:
622 path = os.fsencode(path)
623 self.path = path
624 if worktree_path is None:
625 worktree_path = path
626 if getattr(worktree_path, "encode", None) is not None:
627 worktree_path = os.fsencode(worktree_path)
628 self.worktree_path = worktree_path
629 self._packed_refs = None
630 self._peeled_refs = None
632 def __repr__(self) -> str:
633 return f"{self.__class__.__name__}({self.path!r})"
635 def subkeys(self, base):
636 subkeys = set()
637 path = self.refpath(base)
638 for root, unused_dirs, files in os.walk(path):
639 dir = root[len(path) :]
640 if os.path.sep != "/":
641 dir = dir.replace(os.fsencode(os.path.sep), b"/")
642 dir = dir.strip(b"/")
643 for filename in files:
644 refname = b"/".join(([dir] if dir else []) + [filename])
645 # check_ref_format requires at least one /, so we prepend the
646 # base before calling it.
647 if check_ref_format(base + b"/" + refname):
648 subkeys.add(refname)
649 for key in self.get_packed_refs():
650 if key.startswith(base):
651 subkeys.add(key[len(base) :].strip(b"/"))
652 return subkeys
654 def allkeys(self):
655 allkeys = set()
656 if os.path.exists(self.refpath(HEADREF)):
657 allkeys.add(HEADREF)
658 path = self.refpath(b"")
659 refspath = self.refpath(b"refs")
660 for root, unused_dirs, files in os.walk(refspath):
661 dir = root[len(path) :]
662 if os.path.sep != "/":
663 dir = dir.replace(os.fsencode(os.path.sep), b"/")
664 for filename in files:
665 refname = b"/".join([dir, filename])
666 if check_ref_format(refname):
667 allkeys.add(refname)
668 allkeys.update(self.get_packed_refs())
669 return allkeys
671 def refpath(self, name):
672 """Return the disk path of a ref."""
673 if os.path.sep != "/":
674 name = name.replace(b"/", os.fsencode(os.path.sep))
675 # TODO: as the 'HEAD' reference is working tree specific, it
676 # should actually not be a part of RefsContainer
677 if name == HEADREF:
678 return os.path.join(self.worktree_path, name)
679 else:
680 return os.path.join(self.path, name)
682 def get_packed_refs(self):
683 """Get contents of the packed-refs file.
685 Returns: Dictionary mapping ref names to SHA1s
687 Note: Will return an empty dictionary when no packed-refs file is
688 present.
689 """
690 # TODO: invalidate the cache on repacking
691 if self._packed_refs is None:
692 # set both to empty because we want _peeled_refs to be
693 # None if and only if _packed_refs is also None.
694 self._packed_refs = {}
695 self._peeled_refs = {}
696 path = os.path.join(self.path, b"packed-refs")
697 try:
698 f = GitFile(path, "rb")
699 except FileNotFoundError:
700 return {}
701 with f:
702 first_line = next(iter(f)).rstrip()
703 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
704 for sha, name, peeled in read_packed_refs_with_peeled(f):
705 self._packed_refs[name] = sha
706 if peeled:
707 self._peeled_refs[name] = peeled
708 else:
709 f.seek(0)
710 for sha, name in read_packed_refs(f):
711 self._packed_refs[name] = sha
712 return self._packed_refs
714 def add_packed_refs(self, new_refs: Dict[Ref, Optional[ObjectID]]):
715 """Add the given refs as packed refs.
717 Args:
718 new_refs: A mapping of ref names to targets; if a target is None that
719 means remove the ref
720 """
721 if not new_refs:
722 return
724 path = os.path.join(self.path, b"packed-refs")
726 with GitFile(path, "wb") as f:
727 # reread cached refs from disk, while holding the lock
728 packed_refs = self.get_packed_refs().copy()
730 for ref, target in new_refs.items():
731 # sanity check
732 if ref == HEADREF:
733 raise ValueError("cannot pack HEAD")
735 # remove any loose refs pointing to this one -- please
736 # note that this bypasses remove_if_equals as we don't
737 # want to affect packed refs in here
738 with suppress(OSError):
739 os.remove(self.refpath(ref))
741 if target is not None:
742 packed_refs[ref] = target
743 else:
744 packed_refs.pop(ref, None)
746 write_packed_refs(f, packed_refs, self._peeled_refs)
748 self._packed_refs = packed_refs
750 def get_peeled(self, name):
751 """Return the cached peeled value of a ref, if available.
753 Args:
754 name: Name of the ref to peel
755 Returns: The peeled value of the ref. If the ref is known not point to
756 a tag, this will be the SHA the ref refers to. If the ref may point
757 to a tag, but no cached information is available, None is returned.
758 """
759 self.get_packed_refs()
760 if self._peeled_refs is None or name not in self._packed_refs:
761 # No cache: no peeled refs were read, or this ref is loose
762 return None
763 if name in self._peeled_refs:
764 return self._peeled_refs[name]
765 else:
766 # Known not peelable
767 return self[name]
769 def read_loose_ref(self, name):
770 """Read a reference file and return its contents.
772 If the reference file a symbolic reference, only read the first line of
773 the file. Otherwise, only read the first 40 bytes.
775 Args:
776 name: the refname to read, relative to refpath
777 Returns: The contents of the ref file, or None if the file does not
778 exist.
780 Raises:
781 IOError: if any other error occurs
782 """
783 filename = self.refpath(name)
784 try:
785 with GitFile(filename, "rb") as f:
786 header = f.read(len(SYMREF))
787 if header == SYMREF:
788 # Read only the first line
789 return header + next(iter(f)).rstrip(b"\r\n")
790 else:
791 # Read only the first 40 bytes
792 return header + f.read(40 - len(SYMREF))
793 except (OSError, UnicodeError):
794 # don't assume anything specific about the error; in
795 # particular, invalid or forbidden paths can raise weird
796 # errors depending on the specific operating system
797 return None
799 def _remove_packed_ref(self, name):
800 if self._packed_refs is None:
801 return
802 filename = os.path.join(self.path, b"packed-refs")
803 # reread cached refs from disk, while holding the lock
804 f = GitFile(filename, "wb")
805 try:
806 self._packed_refs = None
807 self.get_packed_refs()
809 if name not in self._packed_refs:
810 return
812 del self._packed_refs[name]
813 with suppress(KeyError):
814 del self._peeled_refs[name]
815 write_packed_refs(f, self._packed_refs, self._peeled_refs)
816 f.close()
817 finally:
818 f.abort()
820 def set_symbolic_ref(
821 self,
822 name,
823 other,
824 committer=None,
825 timestamp=None,
826 timezone=None,
827 message=None,
828 ):
829 """Make a ref point at another ref.
831 Args:
832 name: Name of the ref to set
833 other: Name of the ref to point at
834 message: Optional message to describe the change
835 """
836 self._check_refname(name)
837 self._check_refname(other)
838 filename = self.refpath(name)
839 f = GitFile(filename, "wb")
840 try:
841 f.write(SYMREF + other + b"\n")
842 sha = self.follow(name)[-1]
843 self._log(
844 name,
845 sha,
846 sha,
847 committer=committer,
848 timestamp=timestamp,
849 timezone=timezone,
850 message=message,
851 )
852 except BaseException:
853 f.abort()
854 raise
855 else:
856 f.close()
858 def set_if_equals(
859 self,
860 name,
861 old_ref,
862 new_ref,
863 committer=None,
864 timestamp=None,
865 timezone=None,
866 message=None,
867 ):
868 """Set a refname to new_ref only if it currently equals old_ref.
870 This method follows all symbolic references, and can be used to perform
871 an atomic compare-and-swap operation.
873 Args:
874 name: The refname to set.
875 old_ref: The old sha the refname must refer to, or None to set
876 unconditionally.
877 new_ref: The new sha the refname will refer to.
878 message: Set message for reflog
879 Returns: True if the set was successful, False otherwise.
880 """
881 self._check_refname(name)
882 try:
883 realnames, _ = self.follow(name)
884 realname = realnames[-1]
885 except (KeyError, IndexError, SymrefLoop):
886 realname = name
887 filename = self.refpath(realname)
889 # make sure none of the ancestor folders is in packed refs
890 probe_ref = os.path.dirname(realname)
891 packed_refs = self.get_packed_refs()
892 while probe_ref:
893 if packed_refs.get(probe_ref, None) is not None:
894 raise NotADirectoryError(filename)
895 probe_ref = os.path.dirname(probe_ref)
897 ensure_dir_exists(os.path.dirname(filename))
898 with GitFile(filename, "wb") as f:
899 if old_ref is not None:
900 try:
901 # read again while holding the lock
902 orig_ref = self.read_loose_ref(realname)
903 if orig_ref is None:
904 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
905 if orig_ref != old_ref:
906 f.abort()
907 return False
908 except OSError:
909 f.abort()
910 raise
911 try:
912 f.write(new_ref + b"\n")
913 except OSError:
914 f.abort()
915 raise
916 self._log(
917 realname,
918 old_ref,
919 new_ref,
920 committer=committer,
921 timestamp=timestamp,
922 timezone=timezone,
923 message=message,
924 )
925 return True
927 def add_if_new(
928 self,
929 name: bytes,
930 ref: bytes,
931 committer=None,
932 timestamp=None,
933 timezone=None,
934 message: Optional[bytes] = None,
935 ):
936 """Add a new reference only if it does not already exist.
938 This method follows symrefs, and only ensures that the last ref in the
939 chain does not exist.
941 Args:
942 name: The refname to set.
943 ref: The new sha the refname will refer to.
944 message: Optional message for reflog
945 Returns: True if the add was successful, False otherwise.
946 """
947 try:
948 realnames, contents = self.follow(name)
949 if contents is not None:
950 return False
951 realname = realnames[-1]
952 except (KeyError, IndexError):
953 realname = name
954 self._check_refname(realname)
955 filename = self.refpath(realname)
956 ensure_dir_exists(os.path.dirname(filename))
957 with GitFile(filename, "wb") as f:
958 if os.path.exists(filename) or name in self.get_packed_refs():
959 f.abort()
960 return False
961 try:
962 f.write(ref + b"\n")
963 except OSError:
964 f.abort()
965 raise
966 else:
967 self._log(
968 name,
969 None,
970 ref,
971 committer=committer,
972 timestamp=timestamp,
973 timezone=timezone,
974 message=message,
975 )
976 return True
978 def remove_if_equals(
979 self,
980 name,
981 old_ref,
982 committer=None,
983 timestamp=None,
984 timezone=None,
985 message=None,
986 ):
987 """Remove a refname only if it currently equals old_ref.
989 This method does not follow symbolic references. It can be used to
990 perform an atomic compare-and-delete operation.
992 Args:
993 name: The refname to delete.
994 old_ref: The old sha the refname must refer to, or None to
995 delete unconditionally.
996 message: Optional message
997 Returns: True if the delete was successful, False otherwise.
998 """
999 self._check_refname(name)
1000 filename = self.refpath(name)
1001 ensure_dir_exists(os.path.dirname(filename))
1002 f = GitFile(filename, "wb")
1003 try:
1004 if old_ref is not None:
1005 orig_ref = self.read_loose_ref(name)
1006 if orig_ref is None:
1007 orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
1008 if orig_ref != old_ref:
1009 return False
1011 # remove the reference file itself
1012 try:
1013 found = os.path.lexists(filename)
1014 except OSError:
1015 # may only be packed, or otherwise unstorable
1016 found = False
1018 if found:
1019 os.remove(filename)
1021 self._remove_packed_ref(name)
1022 self._log(
1023 name,
1024 old_ref,
1025 None,
1026 committer=committer,
1027 timestamp=timestamp,
1028 timezone=timezone,
1029 message=message,
1030 )
1031 finally:
1032 # never write, we just wanted the lock
1033 f.abort()
1035 # outside of the lock, clean-up any parent directory that might now
1036 # be empty. this ensures that re-creating a reference of the same
1037 # name of what was previously a directory works as expected
1038 parent = name
1039 while True:
1040 try:
1041 parent, _ = parent.rsplit(b"/", 1)
1042 except ValueError:
1043 break
1045 if parent == b"refs":
1046 break
1047 parent_filename = self.refpath(parent)
1048 try:
1049 os.rmdir(parent_filename)
1050 except OSError:
1051 # this can be caused by the parent directory being
1052 # removed by another process, being not empty, etc.
1053 # in any case, this is non fatal because we already
1054 # removed the reference, just ignore it
1055 break
1057 return True
1060def _split_ref_line(line):
1061 """Split a single ref line into a tuple of SHA1 and name."""
1062 fields = line.rstrip(b"\n\r").split(b" ")
1063 if len(fields) != 2:
1064 raise PackedRefsException(f"invalid ref line {line!r}")
1065 sha, name = fields
1066 if not valid_hexsha(sha):
1067 raise PackedRefsException(f"Invalid hex sha {sha!r}")
1068 if not check_ref_format(name):
1069 raise PackedRefsException(f"invalid ref name {name!r}")
1070 return (sha, name)
1073def read_packed_refs(f):
1074 """Read a packed refs file.
1076 Args:
1077 f: file-like object to read from
1078 Returns: Iterator over tuples with SHA1s and ref names.
1079 """
1080 for line in f:
1081 if line.startswith(b"#"):
1082 # Comment
1083 continue
1084 if line.startswith(b"^"):
1085 raise PackedRefsException("found peeled ref in packed-refs without peeled")
1086 yield _split_ref_line(line)
1089def read_packed_refs_with_peeled(f):
1090 """Read a packed refs file including peeled refs.
1092 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
1093 with ref names, SHA1s, and peeled SHA1s (or None).
1095 Args:
1096 f: file-like object to read from, seek'ed to the second line
1097 """
1098 last = None
1099 for line in f:
1100 if line[0] == b"#":
1101 continue
1102 line = line.rstrip(b"\r\n")
1103 if line.startswith(b"^"):
1104 if not last:
1105 raise PackedRefsException("unexpected peeled ref line")
1106 if not valid_hexsha(line[1:]):
1107 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
1108 sha, name = _split_ref_line(last)
1109 last = None
1110 yield (sha, name, line[1:])
1111 else:
1112 if last:
1113 sha, name = _split_ref_line(last)
1114 yield (sha, name, None)
1115 last = line
1116 if last:
1117 sha, name = _split_ref_line(last)
1118 yield (sha, name, None)
1121def write_packed_refs(f, packed_refs, peeled_refs=None):
1122 """Write a packed refs file.
1124 Args:
1125 f: empty file-like object to write to
1126 packed_refs: dict of refname to sha of packed refs to write
1127 peeled_refs: dict of refname to peeled value of sha
1128 """
1129 if peeled_refs is None:
1130 peeled_refs = {}
1131 else:
1132 f.write(b"# pack-refs with: peeled\n")
1133 for refname in sorted(packed_refs.keys()):
1134 f.write(git_line(packed_refs[refname], refname))
1135 if refname in peeled_refs:
1136 f.write(b"^" + peeled_refs[refname] + b"\n")
1139def read_info_refs(f):
1140 ret = {}
1141 for line in f.readlines():
1142 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
1143 ret[name] = sha
1144 return ret
1147def write_info_refs(refs, store: ObjectContainer):
1148 """Generate info refs."""
1149 # TODO: Avoid recursive import :(
1150 from .object_store import peel_sha
1152 for name, sha in sorted(refs.items()):
1153 # get_refs() includes HEAD as a special case, but we don't want to
1154 # advertise it
1155 if name == HEADREF:
1156 continue
1157 try:
1158 o = store[sha]
1159 except KeyError:
1160 continue
1161 unpeeled, peeled = peel_sha(store, sha)
1162 yield o.id + b"\t" + name + b"\n"
1163 if o.id != peeled.id:
1164 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
1167def is_local_branch(x):
1168 return x.startswith(LOCAL_BRANCH_PREFIX)
1171def strip_peeled_refs(refs):
1172 """Remove all peeled refs."""
1173 return {
1174 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
1175 }
1178def _set_origin_head(refs, origin, origin_head):
1179 # set refs/remotes/origin/HEAD
1180 origin_base = b"refs/remotes/" + origin + b"/"
1181 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
1182 origin_ref = origin_base + HEADREF
1183 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1184 if target_ref in refs:
1185 refs.set_symbolic_ref(origin_ref, target_ref)
1188def _set_default_branch(
1189 refs: RefsContainer,
1190 origin: bytes,
1191 origin_head: bytes,
1192 branch: bytes,
1193 ref_message: Optional[bytes],
1194) -> bytes:
1195 """Set the default branch."""
1196 origin_base = b"refs/remotes/" + origin + b"/"
1197 if branch:
1198 origin_ref = origin_base + branch
1199 if origin_ref in refs:
1200 local_ref = LOCAL_BRANCH_PREFIX + branch
1201 refs.add_if_new(local_ref, refs[origin_ref], ref_message)
1202 head_ref = local_ref
1203 elif LOCAL_TAG_PREFIX + branch in refs:
1204 head_ref = LOCAL_TAG_PREFIX + branch
1205 else:
1206 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
1207 elif origin_head:
1208 head_ref = origin_head
1209 if origin_head.startswith(LOCAL_BRANCH_PREFIX):
1210 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
1211 else:
1212 origin_ref = origin_head
1213 try:
1214 refs.add_if_new(head_ref, refs[origin_ref], ref_message)
1215 except KeyError:
1216 pass
1217 else:
1218 raise ValueError("neither origin_head nor branch are provided")
1219 return head_ref
1222def _set_head(refs, head_ref, ref_message):
1223 if head_ref.startswith(LOCAL_TAG_PREFIX):
1224 # detach HEAD at specified tag
1225 head = refs[head_ref]
1226 if isinstance(head, Tag):
1227 _cls, obj = head.object
1228 head = obj.get_object(obj).id
1229 del refs[HEADREF]
1230 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1231 else:
1232 # set HEAD to specific branch
1233 try:
1234 head = refs[head_ref]
1235 refs.set_symbolic_ref(HEADREF, head_ref)
1236 refs.set_if_equals(HEADREF, None, head, message=ref_message)
1237 except KeyError:
1238 head = None
1239 return head
1242def _import_remote_refs(
1243 refs_container: RefsContainer,
1244 remote_name: str,
1245 refs: Dict[str, str],
1246 message: Optional[bytes] = None,
1247 prune: bool = False,
1248 prune_tags: bool = False,
1249):
1250 stripped_refs = strip_peeled_refs(refs)
1251 branches = {
1252 n[len(LOCAL_BRANCH_PREFIX) :]: v
1253 for (n, v) in stripped_refs.items()
1254 if n.startswith(LOCAL_BRANCH_PREFIX)
1255 }
1256 refs_container.import_refs(
1257 b"refs/remotes/" + remote_name.encode(),
1258 branches,
1259 message=message,
1260 prune=prune,
1261 )
1262 tags = {
1263 n[len(LOCAL_TAG_PREFIX) :]: v
1264 for (n, v) in stripped_refs.items()
1265 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
1266 }
1267 refs_container.import_refs(
1268 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
1269 )
1272def serialize_refs(store, refs):
1273 # TODO: Avoid recursive import :(
1274 from .object_store import peel_sha
1276 ret = {}
1277 for ref, sha in refs.items():
1278 try:
1279 unpeeled, peeled = peel_sha(store, sha)
1280 except KeyError:
1281 warnings.warn(
1282 "ref {} points at non-present sha {}".format(
1283 ref.decode("utf-8", "replace"), sha.decode("ascii")
1284 ),
1285 UserWarning,
1286 )
1287 continue
1288 else:
1289 if isinstance(unpeeled, Tag):
1290 ret[ref + PEELED_TAG_SUFFIX] = peeled.id
1291 ret[ref] = unpeeled.id
1292 return ret