Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

695 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import types 

27import warnings 

28from collections.abc import Iterator 

29from contextlib import suppress 

30from typing import TYPE_CHECKING, Any, Optional, Union 

31 

32if TYPE_CHECKING: 

33 from .file import _GitFile 

34 

35from .errors import PackedRefsException, RefFormatError 

36from .file import GitFile, ensure_dir_exists 

37from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

38from .pack import ObjectContainer 

39 

40Ref = bytes 

41 

42HEADREF = b"HEAD" 

43SYMREF = b"ref: " 

44LOCAL_BRANCH_PREFIX = b"refs/heads/" 

45LOCAL_TAG_PREFIX = b"refs/tags/" 

46LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

47LOCAL_NOTES_PREFIX = b"refs/notes/" 

48BAD_REF_CHARS = set(b"\177 ~^:?*[") 

49PEELED_TAG_SUFFIX = b"^{}" 

50 

51# For backwards compatibility 

52ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

53 

54 

55class SymrefLoop(Exception): 

56 """There is a loop between one or more symrefs.""" 

57 

58 def __init__(self, ref, depth) -> None: 

59 self.ref = ref 

60 self.depth = depth 

61 

62 

63def parse_symref_value(contents: bytes) -> bytes: 

64 """Parse a symref value. 

65 

66 Args: 

67 contents: Contents to parse 

68 Returns: Destination 

69 """ 

70 if contents.startswith(SYMREF): 

71 return contents[len(SYMREF) :].rstrip(b"\r\n") 

72 raise ValueError(contents) 

73 

74 

75def check_ref_format(refname: Ref) -> bool: 

76 """Check if a refname is correctly formatted. 

77 

78 Implements all the same rules as git-check-ref-format[1]. 

79 

80 [1] 

81 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

82 

83 Args: 

84 refname: The refname to check 

85 Returns: True if refname is valid, False otherwise 

86 """ 

87 # These could be combined into one big expression, but are listed 

88 # separately to parallel [1]. 

89 if b"/." in refname or refname.startswith(b"."): 

90 return False 

91 if b"/" not in refname: 

92 return False 

93 if b".." in refname: 

94 return False 

95 for i, c in enumerate(refname): 

96 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

97 return False 

98 if refname[-1] in b"/.": 

99 return False 

100 if refname.endswith(b".lock"): 

101 return False 

102 if b"@{" in refname: 

103 return False 

104 if b"\\" in refname: 

105 return False 

106 return True 

107 

108 

109def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

110 """Parse a remote ref into remote name and branch name. 

111 

112 Args: 

113 ref: Remote ref like b"refs/remotes/origin/main" 

114 

115 Returns: 

116 Tuple of (remote_name, branch_name) 

117 

118 Raises: 

119 ValueError: If ref is not a valid remote ref 

120 """ 

121 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

122 raise ValueError(f"Not a remote ref: {ref!r}") 

123 

124 # Remove the prefix 

125 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

126 

127 # Split into remote name and branch name 

128 parts = remainder.split(b"/", 1) 

129 if len(parts) != 2: 

130 raise ValueError(f"Invalid remote ref format: {ref!r}") 

131 

132 remote_name, branch_name = parts 

133 return (remote_name, branch_name) 

134 

135 

136class RefsContainer: 

137 """A container for refs.""" 

138 

139 def __init__(self, logger=None) -> None: 

140 self._logger = logger 

141 

142 def _log( 

143 self, 

144 ref, 

145 old_sha, 

146 new_sha, 

147 committer=None, 

148 timestamp=None, 

149 timezone=None, 

150 message=None, 

151 ) -> None: 

152 if self._logger is None: 

153 return 

154 if message is None: 

155 return 

156 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

157 

158 def set_symbolic_ref( 

159 self, 

160 name, 

161 other, 

162 committer=None, 

163 timestamp=None, 

164 timezone=None, 

165 message=None, 

166 ) -> None: 

167 """Make a ref point at another ref. 

168 

169 Args: 

170 name: Name of the ref to set 

171 other: Name of the ref to point at 

172 message: Optional message 

173 """ 

174 raise NotImplementedError(self.set_symbolic_ref) 

175 

176 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

177 """Get contents of the packed-refs file. 

178 

179 Returns: Dictionary mapping ref names to SHA1s 

180 

181 Note: Will return an empty dictionary when no packed-refs file is 

182 present. 

183 """ 

184 raise NotImplementedError(self.get_packed_refs) 

185 

186 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

187 """Add the given refs as packed refs. 

188 

189 Args: 

190 new_refs: A mapping of ref names to targets; if a target is None that 

191 means remove the ref 

192 """ 

193 raise NotImplementedError(self.add_packed_refs) 

194 

195 def get_peeled(self, name) -> Optional[ObjectID]: 

196 """Return the cached peeled value of a ref, if available. 

197 

198 Args: 

199 name: Name of the ref to peel 

200 Returns: The peeled value of the ref. If the ref is known not point to 

201 a tag, this will be the SHA the ref refers to. If the ref may point 

202 to a tag, but no cached information is available, None is returned. 

203 """ 

204 return None 

205 

206 def import_refs( 

207 self, 

208 base: Ref, 

209 other: dict[Ref, ObjectID], 

210 committer: Optional[bytes] = None, 

211 timestamp: Optional[bytes] = None, 

212 timezone: Optional[bytes] = None, 

213 message: Optional[bytes] = None, 

214 prune: bool = False, 

215 ) -> None: 

216 if prune: 

217 to_delete = set(self.subkeys(base)) 

218 else: 

219 to_delete = set() 

220 for name, value in other.items(): 

221 if value is None: 

222 to_delete.add(name) 

223 else: 

224 self.set_if_equals( 

225 b"/".join((base, name)), None, value, message=message 

226 ) 

227 if to_delete: 

228 try: 

229 to_delete.remove(name) 

230 except KeyError: 

231 pass 

232 for ref in to_delete: 

233 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

234 

235 def allkeys(self) -> Iterator[Ref]: 

236 """All refs present in this container.""" 

237 raise NotImplementedError(self.allkeys) 

238 

239 def __iter__(self): 

240 return iter(self.allkeys()) 

241 

242 def keys(self, base=None): 

243 """Refs present in this container. 

244 

245 Args: 

246 base: An optional base to return refs under. 

247 Returns: An unsorted set of valid refs in this container, including 

248 packed refs. 

249 """ 

250 if base is not None: 

251 return self.subkeys(base) 

252 else: 

253 return self.allkeys() 

254 

255 def subkeys(self, base): 

256 """Refs present in this container under a base. 

257 

258 Args: 

259 base: The base to return refs under. 

260 Returns: A set of valid refs in this container under the base; the base 

261 prefix is stripped from the ref names returned. 

262 """ 

263 keys = set() 

264 base_len = len(base) + 1 

265 for refname in self.allkeys(): 

266 if refname.startswith(base): 

267 keys.add(refname[base_len:]) 

268 return keys 

269 

270 def as_dict(self, base=None) -> dict[Ref, ObjectID]: 

271 """Return the contents of this container as a dictionary.""" 

272 ret = {} 

273 keys = self.keys(base) 

274 if base is None: 

275 base = b"" 

276 else: 

277 base = base.rstrip(b"/") 

278 for key in keys: 

279 try: 

280 ret[key] = self[(base + b"/" + key).strip(b"/")] 

281 except (SymrefLoop, KeyError): 

282 continue # Unable to resolve 

283 

284 return ret 

285 

286 def _check_refname(self, name) -> None: 

287 """Ensure a refname is valid and lives in refs or is HEAD. 

288 

289 HEAD is not a valid refname according to git-check-ref-format, but this 

290 class needs to be able to touch HEAD. Also, check_ref_format expects 

291 refnames without the leading 'refs/', but this class requires that 

292 so it cannot touch anything outside the refs dir (or HEAD). 

293 

294 Args: 

295 name: The name of the reference. 

296 

297 Raises: 

298 KeyError: if a refname is not HEAD or is otherwise not valid. 

299 """ 

300 if name in (HEADREF, b"refs/stash"): 

301 return 

302 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

303 raise RefFormatError(name) 

304 

305 def read_ref(self, refname): 

306 """Read a reference without following any references. 

307 

308 Args: 

309 refname: The name of the reference 

310 Returns: The contents of the ref file, or None if it does 

311 not exist. 

312 """ 

313 contents = self.read_loose_ref(refname) 

314 if not contents: 

315 contents = self.get_packed_refs().get(refname, None) 

316 return contents 

317 

318 def read_loose_ref(self, name) -> bytes: 

319 """Read a loose reference and return its contents. 

320 

321 Args: 

322 name: the refname to read 

323 Returns: The contents of the ref file, or None if it does 

324 not exist. 

325 """ 

326 raise NotImplementedError(self.read_loose_ref) 

327 

328 def follow(self, name) -> tuple[list[bytes], bytes]: 

329 """Follow a reference name. 

330 

331 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

332 references in the chain 

333 """ 

334 contents = SYMREF + name 

335 depth = 0 

336 refnames = [] 

337 while contents.startswith(SYMREF): 

338 refname = contents[len(SYMREF) :] 

339 refnames.append(refname) 

340 contents = self.read_ref(refname) 

341 if not contents: 

342 break 

343 depth += 1 

344 if depth > 5: 

345 raise SymrefLoop(name, depth) 

346 return refnames, contents 

347 

348 def __contains__(self, refname) -> bool: 

349 if self.read_ref(refname): 

350 return True 

351 return False 

352 

353 def __getitem__(self, name) -> ObjectID: 

354 """Get the SHA1 for a reference name. 

355 

356 This method follows all symbolic references. 

357 """ 

358 _, sha = self.follow(name) 

359 if sha is None: 

360 raise KeyError(name) 

361 return sha 

362 

363 def set_if_equals( 

364 self, 

365 name, 

366 old_ref, 

367 new_ref, 

368 committer=None, 

369 timestamp=None, 

370 timezone=None, 

371 message=None, 

372 ) -> bool: 

373 """Set a refname to new_ref only if it currently equals old_ref. 

374 

375 This method follows all symbolic references if applicable for the 

376 subclass, and can be used to perform an atomic compare-and-swap 

377 operation. 

378 

379 Args: 

380 name: The refname to set. 

381 old_ref: The old sha the refname must refer to, or None to set 

382 unconditionally. 

383 new_ref: The new sha the refname will refer to. 

384 message: Message for reflog 

385 Returns: True if the set was successful, False otherwise. 

386 """ 

387 raise NotImplementedError(self.set_if_equals) 

388 

389 def add_if_new( 

390 self, name, ref, committer=None, timestamp=None, timezone=None, message=None 

391 ) -> bool: 

392 """Add a new reference only if it does not already exist. 

393 

394 Args: 

395 name: Ref name 

396 ref: Ref value 

397 """ 

398 raise NotImplementedError(self.add_if_new) 

399 

400 def __setitem__(self, name, ref) -> None: 

401 """Set a reference name to point to the given SHA1. 

402 

403 This method follows all symbolic references if applicable for the 

404 subclass. 

405 

406 Note: This method unconditionally overwrites the contents of a 

407 reference. To update atomically only if the reference has not 

408 changed, use set_if_equals(). 

409 

410 Args: 

411 name: The refname to set. 

412 ref: The new sha the refname will refer to. 

413 """ 

414 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

415 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

416 self.set_if_equals(name, None, ref) 

417 

418 def remove_if_equals( 

419 self, 

420 name, 

421 old_ref, 

422 committer=None, 

423 timestamp=None, 

424 timezone=None, 

425 message=None, 

426 ) -> bool: 

427 """Remove a refname only if it currently equals old_ref. 

428 

429 This method does not follow symbolic references, even if applicable for 

430 the subclass. It can be used to perform an atomic compare-and-delete 

431 operation. 

432 

433 Args: 

434 name: The refname to delete. 

435 old_ref: The old sha the refname must refer to, or None to 

436 delete unconditionally. 

437 message: Message for reflog 

438 Returns: True if the delete was successful, False otherwise. 

439 """ 

440 raise NotImplementedError(self.remove_if_equals) 

441 

442 def __delitem__(self, name) -> None: 

443 """Remove a refname. 

444 

445 This method does not follow symbolic references, even if applicable for 

446 the subclass. 

447 

448 Note: This method unconditionally deletes the contents of a reference. 

449 To delete atomically only if the reference has not changed, use 

450 remove_if_equals(). 

451 

452 Args: 

453 name: The refname to delete. 

454 """ 

455 self.remove_if_equals(name, None) 

456 

457 def get_symrefs(self): 

458 """Get a dict with all symrefs in this container. 

459 

460 Returns: Dictionary mapping source ref to target ref 

461 """ 

462 ret = {} 

463 for src in self.allkeys(): 

464 try: 

465 dst = parse_symref_value(self.read_ref(src)) 

466 except ValueError: 

467 pass 

468 else: 

469 ret[src] = dst 

470 return ret 

471 

472 def pack_refs(self, all: bool = False) -> None: 

473 """Pack loose refs into packed-refs file. 

474 

475 Args: 

476 all: If True, pack all refs. If False, only pack tags. 

477 """ 

478 raise NotImplementedError(self.pack_refs) 

479 

480 

481class DictRefsContainer(RefsContainer): 

482 """RefsContainer backed by a simple dict. 

483 

484 This container does not support symbolic or packed references and is not 

485 threadsafe. 

486 """ 

487 

488 def __init__(self, refs, logger=None) -> None: 

489 super().__init__(logger=logger) 

490 self._refs = refs 

491 self._peeled: dict[bytes, ObjectID] = {} 

492 self._watchers: set[Any] = set() 

493 

494 def allkeys(self): 

495 return self._refs.keys() 

496 

497 def read_loose_ref(self, name): 

498 return self._refs.get(name, None) 

499 

500 def get_packed_refs(self): 

501 return {} 

502 

503 def _notify(self, ref, newsha) -> None: 

504 for watcher in self._watchers: 

505 watcher._notify((ref, newsha)) 

506 

507 def set_symbolic_ref( 

508 self, 

509 name: Ref, 

510 other: Ref, 

511 committer=None, 

512 timestamp=None, 

513 timezone=None, 

514 message=None, 

515 ) -> None: 

516 old = self.follow(name)[-1] 

517 new = SYMREF + other 

518 self._refs[name] = new 

519 self._notify(name, new) 

520 self._log( 

521 name, 

522 old, 

523 new, 

524 committer=committer, 

525 timestamp=timestamp, 

526 timezone=timezone, 

527 message=message, 

528 ) 

529 

530 def set_if_equals( 

531 self, 

532 name, 

533 old_ref, 

534 new_ref, 

535 committer=None, 

536 timestamp=None, 

537 timezone=None, 

538 message=None, 

539 ) -> bool: 

540 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

541 return False 

542 # Only update the specific ref requested, not the whole chain 

543 self._check_refname(name) 

544 old = self._refs.get(name) 

545 self._refs[name] = new_ref 

546 self._notify(name, new_ref) 

547 self._log( 

548 name, 

549 old, 

550 new_ref, 

551 committer=committer, 

552 timestamp=timestamp, 

553 timezone=timezone, 

554 message=message, 

555 ) 

556 return True 

557 

558 def add_if_new( 

559 self, 

560 name: Ref, 

561 ref: ObjectID, 

562 committer=None, 

563 timestamp=None, 

564 timezone=None, 

565 message: Optional[bytes] = None, 

566 ) -> bool: 

567 if name in self._refs: 

568 return False 

569 self._refs[name] = ref 

570 self._notify(name, ref) 

571 self._log( 

572 name, 

573 None, 

574 ref, 

575 committer=committer, 

576 timestamp=timestamp, 

577 timezone=timezone, 

578 message=message, 

579 ) 

580 return True 

581 

582 def remove_if_equals( 

583 self, 

584 name, 

585 old_ref, 

586 committer=None, 

587 timestamp=None, 

588 timezone=None, 

589 message=None, 

590 ) -> bool: 

591 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

592 return False 

593 try: 

594 old = self._refs.pop(name) 

595 except KeyError: 

596 pass 

597 else: 

598 self._notify(name, None) 

599 self._log( 

600 name, 

601 old, 

602 None, 

603 committer=committer, 

604 timestamp=timestamp, 

605 timezone=timezone, 

606 message=message, 

607 ) 

608 return True 

609 

610 def get_peeled(self, name): 

611 return self._peeled.get(name) 

612 

613 def _update(self, refs) -> None: 

614 """Update multiple refs; intended only for testing.""" 

615 # TODO(dborowitz): replace this with a public function that uses 

616 # set_if_equal. 

617 for ref, sha in refs.items(): 

618 self.set_if_equals(ref, None, sha) 

619 

620 def _update_peeled(self, peeled) -> None: 

621 """Update cached peeled refs; intended only for testing.""" 

622 self._peeled.update(peeled) 

623 

624 

625class InfoRefsContainer(RefsContainer): 

626 """Refs container that reads refs from a info/refs file.""" 

627 

628 def __init__(self, f) -> None: 

629 self._refs = {} 

630 self._peeled = {} 

631 refs = read_info_refs(f) 

632 (self._refs, self._peeled) = split_peeled_refs(refs) 

633 

634 def allkeys(self): 

635 return self._refs.keys() 

636 

637 def read_loose_ref(self, name): 

638 return self._refs.get(name, None) 

639 

640 def get_packed_refs(self): 

641 return {} 

642 

643 def get_peeled(self, name): 

644 try: 

645 return self._peeled[name] 

646 except KeyError: 

647 return self._refs[name] 

648 

649 

650class DiskRefsContainer(RefsContainer): 

651 """Refs container that reads refs from disk.""" 

652 

653 def __init__( 

654 self, 

655 path: Union[str, bytes, os.PathLike], 

656 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None, 

657 logger=None, 

658 ) -> None: 

659 super().__init__(logger=logger) 

660 # Convert path-like objects to strings, then to bytes for Git compatibility 

661 self.path = os.fsencode(os.fspath(path)) 

662 if worktree_path is None: 

663 self.worktree_path = self.path 

664 else: 

665 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

666 self._packed_refs = None 

667 self._peeled_refs = None 

668 

669 def __repr__(self) -> str: 

670 return f"{self.__class__.__name__}({self.path!r})" 

671 

672 def subkeys(self, base): 

673 subkeys = set() 

674 path = self.refpath(base) 

675 for root, unused_dirs, files in os.walk(path): 

676 directory = root[len(path) :] 

677 if os.path.sep != "/": 

678 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

679 directory = directory.strip(b"/") 

680 for filename in files: 

681 refname = b"/".join(([directory] if directory else []) + [filename]) 

682 # check_ref_format requires at least one /, so we prepend the 

683 # base before calling it. 

684 if check_ref_format(base + b"/" + refname): 

685 subkeys.add(refname) 

686 for key in self.get_packed_refs(): 

687 if key.startswith(base): 

688 subkeys.add(key[len(base) :].strip(b"/")) 

689 return subkeys 

690 

691 def allkeys(self): 

692 allkeys = set() 

693 if os.path.exists(self.refpath(HEADREF)): 

694 allkeys.add(HEADREF) 

695 path = self.refpath(b"") 

696 refspath = self.refpath(b"refs") 

697 for root, unused_dirs, files in os.walk(refspath): 

698 directory = root[len(path) :] 

699 if os.path.sep != "/": 

700 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

701 for filename in files: 

702 refname = b"/".join([directory, filename]) 

703 if check_ref_format(refname): 

704 allkeys.add(refname) 

705 allkeys.update(self.get_packed_refs()) 

706 return allkeys 

707 

708 def refpath(self, name): 

709 """Return the disk path of a ref.""" 

710 if os.path.sep != "/": 

711 name = name.replace(b"/", os.fsencode(os.path.sep)) 

712 # TODO: as the 'HEAD' reference is working tree specific, it 

713 # should actually not be a part of RefsContainer 

714 if name == HEADREF: 

715 return os.path.join(self.worktree_path, name) 

716 else: 

717 return os.path.join(self.path, name) 

718 

719 def get_packed_refs(self): 

720 """Get contents of the packed-refs file. 

721 

722 Returns: Dictionary mapping ref names to SHA1s 

723 

724 Note: Will return an empty dictionary when no packed-refs file is 

725 present. 

726 """ 

727 # TODO: invalidate the cache on repacking 

728 if self._packed_refs is None: 

729 # set both to empty because we want _peeled_refs to be 

730 # None if and only if _packed_refs is also None. 

731 self._packed_refs = {} 

732 self._peeled_refs = {} 

733 path = os.path.join(self.path, b"packed-refs") 

734 try: 

735 f = GitFile(path, "rb") 

736 except FileNotFoundError: 

737 return {} 

738 with f: 

739 first_line = next(iter(f)).rstrip() 

740 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

741 for sha, name, peeled in read_packed_refs_with_peeled(f): 

742 self._packed_refs[name] = sha 

743 if peeled: 

744 self._peeled_refs[name] = peeled 

745 else: 

746 f.seek(0) 

747 for sha, name in read_packed_refs(f): 

748 self._packed_refs[name] = sha 

749 return self._packed_refs 

750 

751 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

752 """Add the given refs as packed refs. 

753 

754 Args: 

755 new_refs: A mapping of ref names to targets; if a target is None that 

756 means remove the ref 

757 """ 

758 if not new_refs: 

759 return 

760 

761 path = os.path.join(self.path, b"packed-refs") 

762 

763 with GitFile(path, "wb") as f: 

764 # reread cached refs from disk, while holding the lock 

765 packed_refs = self.get_packed_refs().copy() 

766 

767 for ref, target in new_refs.items(): 

768 # sanity check 

769 if ref == HEADREF: 

770 raise ValueError("cannot pack HEAD") 

771 

772 # remove any loose refs pointing to this one -- please 

773 # note that this bypasses remove_if_equals as we don't 

774 # want to affect packed refs in here 

775 with suppress(OSError): 

776 os.remove(self.refpath(ref)) 

777 

778 if target is not None: 

779 packed_refs[ref] = target 

780 else: 

781 packed_refs.pop(ref, None) 

782 

783 write_packed_refs(f, packed_refs, self._peeled_refs) 

784 

785 self._packed_refs = packed_refs 

786 

787 def get_peeled(self, name): 

788 """Return the cached peeled value of a ref, if available. 

789 

790 Args: 

791 name: Name of the ref to peel 

792 Returns: The peeled value of the ref. If the ref is known not point to 

793 a tag, this will be the SHA the ref refers to. If the ref may point 

794 to a tag, but no cached information is available, None is returned. 

795 """ 

796 self.get_packed_refs() 

797 if self._peeled_refs is None or name not in self._packed_refs: 

798 # No cache: no peeled refs were read, or this ref is loose 

799 return None 

800 if name in self._peeled_refs: 

801 return self._peeled_refs[name] 

802 else: 

803 # Known not peelable 

804 return self[name] 

805 

806 def read_loose_ref(self, name): 

807 """Read a reference file and return its contents. 

808 

809 If the reference file a symbolic reference, only read the first line of 

810 the file. Otherwise, only read the first 40 bytes. 

811 

812 Args: 

813 name: the refname to read, relative to refpath 

814 Returns: The contents of the ref file, or None if the file does not 

815 exist. 

816 

817 Raises: 

818 IOError: if any other error occurs 

819 """ 

820 filename = self.refpath(name) 

821 try: 

822 with GitFile(filename, "rb") as f: 

823 header = f.read(len(SYMREF)) 

824 if header == SYMREF: 

825 # Read only the first line 

826 return header + next(iter(f)).rstrip(b"\r\n") 

827 else: 

828 # Read only the first 40 bytes 

829 return header + f.read(40 - len(SYMREF)) 

830 except (OSError, UnicodeError): 

831 # don't assume anything specific about the error; in 

832 # particular, invalid or forbidden paths can raise weird 

833 # errors depending on the specific operating system 

834 return None 

835 

836 def _remove_packed_ref(self, name) -> None: 

837 if self._packed_refs is None: 

838 return 

839 filename = os.path.join(self.path, b"packed-refs") 

840 # reread cached refs from disk, while holding the lock 

841 f = GitFile(filename, "wb") 

842 try: 

843 self._packed_refs = None 

844 self.get_packed_refs() 

845 

846 if name not in self._packed_refs: 

847 f.abort() 

848 return 

849 

850 del self._packed_refs[name] 

851 with suppress(KeyError): 

852 del self._peeled_refs[name] 

853 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

854 f.close() 

855 except BaseException: 

856 f.abort() 

857 raise 

858 

859 def set_symbolic_ref( 

860 self, 

861 name, 

862 other, 

863 committer=None, 

864 timestamp=None, 

865 timezone=None, 

866 message=None, 

867 ) -> None: 

868 """Make a ref point at another ref. 

869 

870 Args: 

871 name: Name of the ref to set 

872 other: Name of the ref to point at 

873 message: Optional message to describe the change 

874 """ 

875 self._check_refname(name) 

876 self._check_refname(other) 

877 filename = self.refpath(name) 

878 f = GitFile(filename, "wb") 

879 try: 

880 f.write(SYMREF + other + b"\n") 

881 sha = self.follow(name)[-1] 

882 self._log( 

883 name, 

884 sha, 

885 sha, 

886 committer=committer, 

887 timestamp=timestamp, 

888 timezone=timezone, 

889 message=message, 

890 ) 

891 except BaseException: 

892 f.abort() 

893 raise 

894 else: 

895 f.close() 

896 

897 def set_if_equals( 

898 self, 

899 name, 

900 old_ref, 

901 new_ref, 

902 committer=None, 

903 timestamp=None, 

904 timezone=None, 

905 message=None, 

906 ) -> bool: 

907 """Set a refname to new_ref only if it currently equals old_ref. 

908 

909 This method follows all symbolic references, and can be used to perform 

910 an atomic compare-and-swap operation. 

911 

912 Args: 

913 name: The refname to set. 

914 old_ref: The old sha the refname must refer to, or None to set 

915 unconditionally. 

916 new_ref: The new sha the refname will refer to. 

917 message: Set message for reflog 

918 Returns: True if the set was successful, False otherwise. 

919 """ 

920 self._check_refname(name) 

921 try: 

922 realnames, _ = self.follow(name) 

923 realname = realnames[-1] 

924 except (KeyError, IndexError, SymrefLoop): 

925 realname = name 

926 filename = self.refpath(realname) 

927 

928 # make sure none of the ancestor folders is in packed refs 

929 probe_ref = os.path.dirname(realname) 

930 packed_refs = self.get_packed_refs() 

931 while probe_ref: 

932 if packed_refs.get(probe_ref, None) is not None: 

933 raise NotADirectoryError(filename) 

934 probe_ref = os.path.dirname(probe_ref) 

935 

936 ensure_dir_exists(os.path.dirname(filename)) 

937 with GitFile(filename, "wb") as f: 

938 if old_ref is not None: 

939 try: 

940 # read again while holding the lock to handle race conditions 

941 orig_ref = self.read_loose_ref(realname) 

942 if orig_ref is None: 

943 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

944 if orig_ref != old_ref: 

945 f.abort() 

946 return False 

947 except OSError: 

948 f.abort() 

949 raise 

950 

951 # Check if ref already has the desired value while holding the lock 

952 # This avoids fsync when ref is unchanged but still detects lock conflicts 

953 current_ref = self.read_loose_ref(realname) 

954 if current_ref is None: 

955 current_ref = packed_refs.get(realname, None) 

956 

957 if current_ref is not None and current_ref == new_ref: 

958 # Ref already has desired value, abort write to avoid fsync 

959 f.abort() 

960 return True 

961 

962 try: 

963 f.write(new_ref + b"\n") 

964 except OSError: 

965 f.abort() 

966 raise 

967 self._log( 

968 realname, 

969 old_ref, 

970 new_ref, 

971 committer=committer, 

972 timestamp=timestamp, 

973 timezone=timezone, 

974 message=message, 

975 ) 

976 return True 

977 

978 def add_if_new( 

979 self, 

980 name: bytes, 

981 ref: bytes, 

982 committer=None, 

983 timestamp=None, 

984 timezone=None, 

985 message: Optional[bytes] = None, 

986 ) -> bool: 

987 """Add a new reference only if it does not already exist. 

988 

989 This method follows symrefs, and only ensures that the last ref in the 

990 chain does not exist. 

991 

992 Args: 

993 name: The refname to set. 

994 ref: The new sha the refname will refer to. 

995 message: Optional message for reflog 

996 Returns: True if the add was successful, False otherwise. 

997 """ 

998 try: 

999 realnames, contents = self.follow(name) 

1000 if contents is not None: 

1001 return False 

1002 realname = realnames[-1] 

1003 except (KeyError, IndexError): 

1004 realname = name 

1005 self._check_refname(realname) 

1006 filename = self.refpath(realname) 

1007 ensure_dir_exists(os.path.dirname(filename)) 

1008 with GitFile(filename, "wb") as f: 

1009 if os.path.exists(filename) or name in self.get_packed_refs(): 

1010 f.abort() 

1011 return False 

1012 try: 

1013 f.write(ref + b"\n") 

1014 except OSError: 

1015 f.abort() 

1016 raise 

1017 else: 

1018 self._log( 

1019 name, 

1020 None, 

1021 ref, 

1022 committer=committer, 

1023 timestamp=timestamp, 

1024 timezone=timezone, 

1025 message=message, 

1026 ) 

1027 return True 

1028 

1029 def remove_if_equals( 

1030 self, 

1031 name, 

1032 old_ref, 

1033 committer=None, 

1034 timestamp=None, 

1035 timezone=None, 

1036 message=None, 

1037 ) -> bool: 

1038 """Remove a refname only if it currently equals old_ref. 

1039 

1040 This method does not follow symbolic references. It can be used to 

1041 perform an atomic compare-and-delete operation. 

1042 

1043 Args: 

1044 name: The refname to delete. 

1045 old_ref: The old sha the refname must refer to, or None to 

1046 delete unconditionally. 

1047 message: Optional message 

1048 Returns: True if the delete was successful, False otherwise. 

1049 """ 

1050 self._check_refname(name) 

1051 filename = self.refpath(name) 

1052 ensure_dir_exists(os.path.dirname(filename)) 

1053 f = GitFile(filename, "wb") 

1054 try: 

1055 if old_ref is not None: 

1056 orig_ref = self.read_loose_ref(name) 

1057 if orig_ref is None: 

1058 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1059 if orig_ref != old_ref: 

1060 return False 

1061 

1062 # remove the reference file itself 

1063 try: 

1064 found = os.path.lexists(filename) 

1065 except OSError: 

1066 # may only be packed, or otherwise unstorable 

1067 found = False 

1068 

1069 if found: 

1070 os.remove(filename) 

1071 

1072 self._remove_packed_ref(name) 

1073 self._log( 

1074 name, 

1075 old_ref, 

1076 None, 

1077 committer=committer, 

1078 timestamp=timestamp, 

1079 timezone=timezone, 

1080 message=message, 

1081 ) 

1082 finally: 

1083 # never write, we just wanted the lock 

1084 f.abort() 

1085 

1086 # outside of the lock, clean-up any parent directory that might now 

1087 # be empty. this ensures that re-creating a reference of the same 

1088 # name of what was previously a directory works as expected 

1089 parent = name 

1090 while True: 

1091 try: 

1092 parent, _ = parent.rsplit(b"/", 1) 

1093 except ValueError: 

1094 break 

1095 

1096 if parent == b"refs": 

1097 break 

1098 parent_filename = self.refpath(parent) 

1099 try: 

1100 os.rmdir(parent_filename) 

1101 except OSError: 

1102 # this can be caused by the parent directory being 

1103 # removed by another process, being not empty, etc. 

1104 # in any case, this is non fatal because we already 

1105 # removed the reference, just ignore it 

1106 break 

1107 

1108 return True 

1109 

1110 def pack_refs(self, all: bool = False) -> None: 

1111 """Pack loose refs into packed-refs file. 

1112 

1113 Args: 

1114 all: If True, pack all refs. If False, only pack tags. 

1115 """ 

1116 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1117 for ref in self.allkeys(): 

1118 if ref == HEADREF: 

1119 # Never pack HEAD 

1120 continue 

1121 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1122 try: 

1123 sha = self[ref] 

1124 if sha: 

1125 refs_to_pack[ref] = sha 

1126 except KeyError: 

1127 # Broken ref, skip it 

1128 pass 

1129 

1130 if refs_to_pack: 

1131 self.add_packed_refs(refs_to_pack) 

1132 

1133 

1134def _split_ref_line(line): 

1135 """Split a single ref line into a tuple of SHA1 and name.""" 

1136 fields = line.rstrip(b"\n\r").split(b" ") 

1137 if len(fields) != 2: 

1138 raise PackedRefsException(f"invalid ref line {line!r}") 

1139 sha, name = fields 

1140 if not valid_hexsha(sha): 

1141 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1142 if not check_ref_format(name): 

1143 raise PackedRefsException(f"invalid ref name {name!r}") 

1144 return (sha, name) 

1145 

1146 

1147def read_packed_refs(f): 

1148 """Read a packed refs file. 

1149 

1150 Args: 

1151 f: file-like object to read from 

1152 Returns: Iterator over tuples with SHA1s and ref names. 

1153 """ 

1154 for line in f: 

1155 if line.startswith(b"#"): 

1156 # Comment 

1157 continue 

1158 if line.startswith(b"^"): 

1159 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1160 yield _split_ref_line(line) 

1161 

1162 

1163def read_packed_refs_with_peeled(f): 

1164 """Read a packed refs file including peeled refs. 

1165 

1166 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1167 with ref names, SHA1s, and peeled SHA1s (or None). 

1168 

1169 Args: 

1170 f: file-like object to read from, seek'ed to the second line 

1171 """ 

1172 last = None 

1173 for line in f: 

1174 if line[0] == b"#": 

1175 continue 

1176 line = line.rstrip(b"\r\n") 

1177 if line.startswith(b"^"): 

1178 if not last: 

1179 raise PackedRefsException("unexpected peeled ref line") 

1180 if not valid_hexsha(line[1:]): 

1181 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1182 sha, name = _split_ref_line(last) 

1183 last = None 

1184 yield (sha, name, line[1:]) 

1185 else: 

1186 if last: 

1187 sha, name = _split_ref_line(last) 

1188 yield (sha, name, None) 

1189 last = line 

1190 if last: 

1191 sha, name = _split_ref_line(last) 

1192 yield (sha, name, None) 

1193 

1194 

1195def write_packed_refs(f, packed_refs, peeled_refs=None) -> None: 

1196 """Write a packed refs file. 

1197 

1198 Args: 

1199 f: empty file-like object to write to 

1200 packed_refs: dict of refname to sha of packed refs to write 

1201 peeled_refs: dict of refname to peeled value of sha 

1202 """ 

1203 if peeled_refs is None: 

1204 peeled_refs = {} 

1205 else: 

1206 f.write(b"# pack-refs with: peeled\n") 

1207 for refname in sorted(packed_refs.keys()): 

1208 f.write(git_line(packed_refs[refname], refname)) 

1209 if refname in peeled_refs: 

1210 f.write(b"^" + peeled_refs[refname] + b"\n") 

1211 

1212 

1213def read_info_refs(f): 

1214 ret = {} 

1215 for line in f.readlines(): 

1216 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1217 ret[name] = sha 

1218 return ret 

1219 

1220 

1221def write_info_refs(refs, store: ObjectContainer): 

1222 """Generate info refs.""" 

1223 # TODO: Avoid recursive import :( 

1224 from .object_store import peel_sha 

1225 

1226 for name, sha in sorted(refs.items()): 

1227 # get_refs() includes HEAD as a special case, but we don't want to 

1228 # advertise it 

1229 if name == HEADREF: 

1230 continue 

1231 try: 

1232 o = store[sha] 

1233 except KeyError: 

1234 continue 

1235 unpeeled, peeled = peel_sha(store, sha) 

1236 yield o.id + b"\t" + name + b"\n" 

1237 if o.id != peeled.id: 

1238 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1239 

1240 

1241def is_local_branch(x): 

1242 return x.startswith(LOCAL_BRANCH_PREFIX) 

1243 

1244 

1245def strip_peeled_refs(refs): 

1246 """Remove all peeled refs.""" 

1247 return { 

1248 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1249 } 

1250 

1251 

1252def split_peeled_refs(refs): 

1253 """Split peeled refs from regular refs.""" 

1254 peeled = {} 

1255 regular = {} 

1256 for ref, sha in refs.items(): 

1257 if ref.endswith(PEELED_TAG_SUFFIX): 

1258 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1259 else: 

1260 regular[ref] = sha 

1261 return regular, peeled 

1262 

1263 

1264def _set_origin_head(refs, origin, origin_head) -> None: 

1265 # set refs/remotes/origin/HEAD 

1266 origin_base = b"refs/remotes/" + origin + b"/" 

1267 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1268 origin_ref = origin_base + HEADREF 

1269 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1270 if target_ref in refs: 

1271 refs.set_symbolic_ref(origin_ref, target_ref) 

1272 

1273 

1274def _set_default_branch( 

1275 refs: RefsContainer, 

1276 origin: bytes, 

1277 origin_head: Optional[bytes], 

1278 branch: bytes, 

1279 ref_message: Optional[bytes], 

1280) -> bytes: 

1281 """Set the default branch.""" 

1282 origin_base = b"refs/remotes/" + origin + b"/" 

1283 if branch: 

1284 origin_ref = origin_base + branch 

1285 if origin_ref in refs: 

1286 local_ref = LOCAL_BRANCH_PREFIX + branch 

1287 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1288 head_ref = local_ref 

1289 elif LOCAL_TAG_PREFIX + branch in refs: 

1290 head_ref = LOCAL_TAG_PREFIX + branch 

1291 else: 

1292 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1293 elif origin_head: 

1294 head_ref = origin_head 

1295 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1296 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1297 else: 

1298 origin_ref = origin_head 

1299 try: 

1300 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1301 except KeyError: 

1302 pass 

1303 else: 

1304 raise ValueError("neither origin_head nor branch are provided") 

1305 return head_ref 

1306 

1307 

1308def _set_head(refs, head_ref, ref_message): 

1309 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1310 # detach HEAD at specified tag 

1311 head = refs[head_ref] 

1312 if isinstance(head, Tag): 

1313 _cls, obj = head.object 

1314 head = obj.get_object(obj).id 

1315 del refs[HEADREF] 

1316 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1317 else: 

1318 # set HEAD to specific branch 

1319 try: 

1320 head = refs[head_ref] 

1321 refs.set_symbolic_ref(HEADREF, head_ref) 

1322 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1323 except KeyError: 

1324 head = None 

1325 return head 

1326 

1327 

1328def _import_remote_refs( 

1329 refs_container: RefsContainer, 

1330 remote_name: str, 

1331 refs: dict[str, str], 

1332 message: Optional[bytes] = None, 

1333 prune: bool = False, 

1334 prune_tags: bool = False, 

1335) -> None: 

1336 stripped_refs = strip_peeled_refs(refs) 

1337 branches = { 

1338 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1339 for (n, v) in stripped_refs.items() 

1340 if n.startswith(LOCAL_BRANCH_PREFIX) 

1341 } 

1342 refs_container.import_refs( 

1343 b"refs/remotes/" + remote_name.encode(), 

1344 branches, 

1345 message=message, 

1346 prune=prune, 

1347 ) 

1348 tags = { 

1349 n[len(LOCAL_TAG_PREFIX) :]: v 

1350 for (n, v) in stripped_refs.items() 

1351 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1352 } 

1353 refs_container.import_refs( 

1354 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1355 ) 

1356 

1357 

1358def serialize_refs(store, refs): 

1359 # TODO: Avoid recursive import :( 

1360 from .object_store import peel_sha 

1361 

1362 ret = {} 

1363 for ref, sha in refs.items(): 

1364 try: 

1365 unpeeled, peeled = peel_sha(store, sha) 

1366 except KeyError: 

1367 warnings.warn( 

1368 "ref {} points at non-present sha {}".format( 

1369 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1370 ), 

1371 UserWarning, 

1372 ) 

1373 continue 

1374 else: 

1375 if isinstance(unpeeled, Tag): 

1376 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1377 ret[ref] = unpeeled.id 

1378 return ret 

1379 

1380 

1381class locked_ref: 

1382 """Lock a ref while making modifications. 

1383 

1384 Works as a context manager. 

1385 """ 

1386 

1387 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1388 self._refs_container = refs_container 

1389 self._refname = refname 

1390 self._file: Optional[_GitFile] = None 

1391 self._realname: Optional[Ref] = None 

1392 self._deleted = False 

1393 

1394 def __enter__(self) -> "locked_ref": 

1395 self._refs_container._check_refname(self._refname) 

1396 try: 

1397 realnames, _ = self._refs_container.follow(self._refname) 

1398 self._realname = realnames[-1] 

1399 except (KeyError, IndexError, SymrefLoop): 

1400 self._realname = self._refname 

1401 

1402 filename = self._refs_container.refpath(self._realname) 

1403 ensure_dir_exists(os.path.dirname(filename)) 

1404 self._file = GitFile(filename, "wb") 

1405 return self 

1406 

1407 def __exit__( 

1408 self, 

1409 exc_type: Optional[type], 

1410 exc_value: Optional[BaseException], 

1411 traceback: Optional[types.TracebackType], 

1412 ) -> None: 

1413 if self._file: 

1414 if exc_type is not None or self._deleted: 

1415 self._file.abort() 

1416 else: 

1417 self._file.close() 

1418 

1419 def get(self) -> Optional[bytes]: 

1420 """Get the current value of the ref.""" 

1421 if not self._file: 

1422 raise RuntimeError("locked_ref not in context") 

1423 

1424 current_ref = self._refs_container.read_loose_ref(self._realname) 

1425 if current_ref is None: 

1426 current_ref = self._refs_container.get_packed_refs().get( 

1427 self._realname, None 

1428 ) 

1429 return current_ref 

1430 

1431 def ensure_equals(self, expected_value: Optional[bytes]) -> bool: 

1432 """Ensure the ref currently equals the expected value. 

1433 

1434 Args: 

1435 expected_value: The expected current value of the ref 

1436 Returns: 

1437 True if the ref equals the expected value, False otherwise 

1438 """ 

1439 current_value = self.get() 

1440 return current_value == expected_value 

1441 

1442 def set(self, new_ref: bytes) -> None: 

1443 """Set the ref to a new value. 

1444 

1445 Args: 

1446 new_ref: The new SHA1 or symbolic ref value 

1447 """ 

1448 if not self._file: 

1449 raise RuntimeError("locked_ref not in context") 

1450 

1451 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1452 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1453 

1454 self._file.seek(0) 

1455 self._file.truncate() 

1456 self._file.write(new_ref + b"\n") 

1457 self._deleted = False 

1458 

1459 def set_symbolic_ref(self, target: Ref) -> None: 

1460 """Make this ref point at another ref. 

1461 

1462 Args: 

1463 target: Name of the ref to point at 

1464 """ 

1465 if not self._file: 

1466 raise RuntimeError("locked_ref not in context") 

1467 

1468 self._refs_container._check_refname(target) 

1469 self._file.seek(0) 

1470 self._file.truncate() 

1471 self._file.write(SYMREF + target + b"\n") 

1472 self._deleted = False 

1473 

1474 def delete(self) -> None: 

1475 """Delete the ref file while holding the lock.""" 

1476 if not self._file: 

1477 raise RuntimeError("locked_ref not in context") 

1478 

1479 # Delete the actual ref file while holding the lock 

1480 if self._realname: 

1481 filename = self._refs_container.refpath(self._realname) 

1482 try: 

1483 if os.path.lexists(filename): 

1484 os.remove(filename) 

1485 except FileNotFoundError: 

1486 pass 

1487 self._refs_container._remove_packed_ref(self._realname) 

1488 

1489 self._deleted = True