Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

627 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import warnings 

27from collections.abc import Iterator 

28from contextlib import suppress 

29from typing import Any, Optional, Union 

30 

31from .errors import PackedRefsException, RefFormatError 

32from .file import GitFile, ensure_dir_exists 

33from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

34from .pack import ObjectContainer 

35 

36Ref = bytes 

37 

38HEADREF = b"HEAD" 

39SYMREF = b"ref: " 

40LOCAL_BRANCH_PREFIX = b"refs/heads/" 

41LOCAL_TAG_PREFIX = b"refs/tags/" 

42LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

43LOCAL_NOTES_PREFIX = b"refs/notes/" 

44BAD_REF_CHARS = set(b"\177 ~^:?*[") 

45PEELED_TAG_SUFFIX = b"^{}" 

46 

47# For backwards compatibility 

48ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

49 

50 

51class SymrefLoop(Exception): 

52 """There is a loop between one or more symrefs.""" 

53 

54 def __init__(self, ref, depth) -> None: 

55 self.ref = ref 

56 self.depth = depth 

57 

58 

59def parse_symref_value(contents: bytes) -> bytes: 

60 """Parse a symref value. 

61 

62 Args: 

63 contents: Contents to parse 

64 Returns: Destination 

65 """ 

66 if contents.startswith(SYMREF): 

67 return contents[len(SYMREF) :].rstrip(b"\r\n") 

68 raise ValueError(contents) 

69 

70 

71def check_ref_format(refname: Ref) -> bool: 

72 """Check if a refname is correctly formatted. 

73 

74 Implements all the same rules as git-check-ref-format[1]. 

75 

76 [1] 

77 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

78 

79 Args: 

80 refname: The refname to check 

81 Returns: True if refname is valid, False otherwise 

82 """ 

83 # These could be combined into one big expression, but are listed 

84 # separately to parallel [1]. 

85 if b"/." in refname or refname.startswith(b"."): 

86 return False 

87 if b"/" not in refname: 

88 return False 

89 if b".." in refname: 

90 return False 

91 for i, c in enumerate(refname): 

92 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

93 return False 

94 if refname[-1] in b"/.": 

95 return False 

96 if refname.endswith(b".lock"): 

97 return False 

98 if b"@{" in refname: 

99 return False 

100 if b"\\" in refname: 

101 return False 

102 return True 

103 

104 

105def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

106 """Parse a remote ref into remote name and branch name. 

107 

108 Args: 

109 ref: Remote ref like b"refs/remotes/origin/main" 

110 

111 Returns: 

112 Tuple of (remote_name, branch_name) 

113 

114 Raises: 

115 ValueError: If ref is not a valid remote ref 

116 """ 

117 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

118 raise ValueError(f"Not a remote ref: {ref!r}") 

119 

120 # Remove the prefix 

121 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

122 

123 # Split into remote name and branch name 

124 parts = remainder.split(b"/", 1) 

125 if len(parts) != 2: 

126 raise ValueError(f"Invalid remote ref format: {ref!r}") 

127 

128 remote_name, branch_name = parts 

129 return (remote_name, branch_name) 

130 

131 

132class RefsContainer: 

133 """A container for refs.""" 

134 

135 def __init__(self, logger=None) -> None: 

136 self._logger = logger 

137 

138 def _log( 

139 self, 

140 ref, 

141 old_sha, 

142 new_sha, 

143 committer=None, 

144 timestamp=None, 

145 timezone=None, 

146 message=None, 

147 ) -> None: 

148 if self._logger is None: 

149 return 

150 if message is None: 

151 return 

152 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

153 

154 def set_symbolic_ref( 

155 self, 

156 name, 

157 other, 

158 committer=None, 

159 timestamp=None, 

160 timezone=None, 

161 message=None, 

162 ) -> None: 

163 """Make a ref point at another ref. 

164 

165 Args: 

166 name: Name of the ref to set 

167 other: Name of the ref to point at 

168 message: Optional message 

169 """ 

170 raise NotImplementedError(self.set_symbolic_ref) 

171 

172 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

173 """Get contents of the packed-refs file. 

174 

175 Returns: Dictionary mapping ref names to SHA1s 

176 

177 Note: Will return an empty dictionary when no packed-refs file is 

178 present. 

179 """ 

180 raise NotImplementedError(self.get_packed_refs) 

181 

182 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

183 """Add the given refs as packed refs. 

184 

185 Args: 

186 new_refs: A mapping of ref names to targets; if a target is None that 

187 means remove the ref 

188 """ 

189 raise NotImplementedError(self.add_packed_refs) 

190 

191 def get_peeled(self, name) -> Optional[ObjectID]: 

192 """Return the cached peeled value of a ref, if available. 

193 

194 Args: 

195 name: Name of the ref to peel 

196 Returns: The peeled value of the ref. If the ref is known not point to 

197 a tag, this will be the SHA the ref refers to. If the ref may point 

198 to a tag, but no cached information is available, None is returned. 

199 """ 

200 return None 

201 

202 def import_refs( 

203 self, 

204 base: Ref, 

205 other: dict[Ref, ObjectID], 

206 committer: Optional[bytes] = None, 

207 timestamp: Optional[bytes] = None, 

208 timezone: Optional[bytes] = None, 

209 message: Optional[bytes] = None, 

210 prune: bool = False, 

211 ) -> None: 

212 if prune: 

213 to_delete = set(self.subkeys(base)) 

214 else: 

215 to_delete = set() 

216 for name, value in other.items(): 

217 if value is None: 

218 to_delete.add(name) 

219 else: 

220 self.set_if_equals( 

221 b"/".join((base, name)), None, value, message=message 

222 ) 

223 if to_delete: 

224 try: 

225 to_delete.remove(name) 

226 except KeyError: 

227 pass 

228 for ref in to_delete: 

229 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

230 

231 def allkeys(self) -> Iterator[Ref]: 

232 """All refs present in this container.""" 

233 raise NotImplementedError(self.allkeys) 

234 

235 def __iter__(self): 

236 return iter(self.allkeys()) 

237 

238 def keys(self, base=None): 

239 """Refs present in this container. 

240 

241 Args: 

242 base: An optional base to return refs under. 

243 Returns: An unsorted set of valid refs in this container, including 

244 packed refs. 

245 """ 

246 if base is not None: 

247 return self.subkeys(base) 

248 else: 

249 return self.allkeys() 

250 

251 def subkeys(self, base): 

252 """Refs present in this container under a base. 

253 

254 Args: 

255 base: The base to return refs under. 

256 Returns: A set of valid refs in this container under the base; the base 

257 prefix is stripped from the ref names returned. 

258 """ 

259 keys = set() 

260 base_len = len(base) + 1 

261 for refname in self.allkeys(): 

262 if refname.startswith(base): 

263 keys.add(refname[base_len:]) 

264 return keys 

265 

266 def as_dict(self, base=None) -> dict[Ref, ObjectID]: 

267 """Return the contents of this container as a dictionary.""" 

268 ret = {} 

269 keys = self.keys(base) 

270 if base is None: 

271 base = b"" 

272 else: 

273 base = base.rstrip(b"/") 

274 for key in keys: 

275 try: 

276 ret[key] = self[(base + b"/" + key).strip(b"/")] 

277 except (SymrefLoop, KeyError): 

278 continue # Unable to resolve 

279 

280 return ret 

281 

282 def _check_refname(self, name) -> None: 

283 """Ensure a refname is valid and lives in refs or is HEAD. 

284 

285 HEAD is not a valid refname according to git-check-ref-format, but this 

286 class needs to be able to touch HEAD. Also, check_ref_format expects 

287 refnames without the leading 'refs/', but this class requires that 

288 so it cannot touch anything outside the refs dir (or HEAD). 

289 

290 Args: 

291 name: The name of the reference. 

292 

293 Raises: 

294 KeyError: if a refname is not HEAD or is otherwise not valid. 

295 """ 

296 if name in (HEADREF, b"refs/stash"): 

297 return 

298 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

299 raise RefFormatError(name) 

300 

301 def read_ref(self, refname): 

302 """Read a reference without following any references. 

303 

304 Args: 

305 refname: The name of the reference 

306 Returns: The contents of the ref file, or None if it does 

307 not exist. 

308 """ 

309 contents = self.read_loose_ref(refname) 

310 if not contents: 

311 contents = self.get_packed_refs().get(refname, None) 

312 return contents 

313 

314 def read_loose_ref(self, name) -> bytes: 

315 """Read a loose reference and return its contents. 

316 

317 Args: 

318 name: the refname to read 

319 Returns: The contents of the ref file, or None if it does 

320 not exist. 

321 """ 

322 raise NotImplementedError(self.read_loose_ref) 

323 

324 def follow(self, name) -> tuple[list[bytes], bytes]: 

325 """Follow a reference name. 

326 

327 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

328 references in the chain 

329 """ 

330 contents = SYMREF + name 

331 depth = 0 

332 refnames = [] 

333 while contents.startswith(SYMREF): 

334 refname = contents[len(SYMREF) :] 

335 refnames.append(refname) 

336 contents = self.read_ref(refname) 

337 if not contents: 

338 break 

339 depth += 1 

340 if depth > 5: 

341 raise SymrefLoop(name, depth) 

342 return refnames, contents 

343 

344 def __contains__(self, refname) -> bool: 

345 if self.read_ref(refname): 

346 return True 

347 return False 

348 

349 def __getitem__(self, name) -> ObjectID: 

350 """Get the SHA1 for a reference name. 

351 

352 This method follows all symbolic references. 

353 """ 

354 _, sha = self.follow(name) 

355 if sha is None: 

356 raise KeyError(name) 

357 return sha 

358 

359 def set_if_equals( 

360 self, 

361 name, 

362 old_ref, 

363 new_ref, 

364 committer=None, 

365 timestamp=None, 

366 timezone=None, 

367 message=None, 

368 ) -> bool: 

369 """Set a refname to new_ref only if it currently equals old_ref. 

370 

371 This method follows all symbolic references if applicable for the 

372 subclass, and can be used to perform an atomic compare-and-swap 

373 operation. 

374 

375 Args: 

376 name: The refname to set. 

377 old_ref: The old sha the refname must refer to, or None to set 

378 unconditionally. 

379 new_ref: The new sha the refname will refer to. 

380 message: Message for reflog 

381 Returns: True if the set was successful, False otherwise. 

382 """ 

383 raise NotImplementedError(self.set_if_equals) 

384 

385 def add_if_new( 

386 self, name, ref, committer=None, timestamp=None, timezone=None, message=None 

387 ) -> bool: 

388 """Add a new reference only if it does not already exist. 

389 

390 Args: 

391 name: Ref name 

392 ref: Ref value 

393 """ 

394 raise NotImplementedError(self.add_if_new) 

395 

396 def __setitem__(self, name, ref) -> None: 

397 """Set a reference name to point to the given SHA1. 

398 

399 This method follows all symbolic references if applicable for the 

400 subclass. 

401 

402 Note: This method unconditionally overwrites the contents of a 

403 reference. To update atomically only if the reference has not 

404 changed, use set_if_equals(). 

405 

406 Args: 

407 name: The refname to set. 

408 ref: The new sha the refname will refer to. 

409 """ 

410 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

411 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

412 self.set_if_equals(name, None, ref) 

413 

414 def remove_if_equals( 

415 self, 

416 name, 

417 old_ref, 

418 committer=None, 

419 timestamp=None, 

420 timezone=None, 

421 message=None, 

422 ) -> bool: 

423 """Remove a refname only if it currently equals old_ref. 

424 

425 This method does not follow symbolic references, even if applicable for 

426 the subclass. It can be used to perform an atomic compare-and-delete 

427 operation. 

428 

429 Args: 

430 name: The refname to delete. 

431 old_ref: The old sha the refname must refer to, or None to 

432 delete unconditionally. 

433 message: Message for reflog 

434 Returns: True if the delete was successful, False otherwise. 

435 """ 

436 raise NotImplementedError(self.remove_if_equals) 

437 

438 def __delitem__(self, name) -> None: 

439 """Remove a refname. 

440 

441 This method does not follow symbolic references, even if applicable for 

442 the subclass. 

443 

444 Note: This method unconditionally deletes the contents of a reference. 

445 To delete atomically only if the reference has not changed, use 

446 remove_if_equals(). 

447 

448 Args: 

449 name: The refname to delete. 

450 """ 

451 self.remove_if_equals(name, None) 

452 

453 def get_symrefs(self): 

454 """Get a dict with all symrefs in this container. 

455 

456 Returns: Dictionary mapping source ref to target ref 

457 """ 

458 ret = {} 

459 for src in self.allkeys(): 

460 try: 

461 dst = parse_symref_value(self.read_ref(src)) 

462 except ValueError: 

463 pass 

464 else: 

465 ret[src] = dst 

466 return ret 

467 

468 def pack_refs(self, all: bool = False) -> None: 

469 """Pack loose refs into packed-refs file. 

470 

471 Args: 

472 all: If True, pack all refs. If False, only pack tags. 

473 """ 

474 raise NotImplementedError(self.pack_refs) 

475 

476 

477class DictRefsContainer(RefsContainer): 

478 """RefsContainer backed by a simple dict. 

479 

480 This container does not support symbolic or packed references and is not 

481 threadsafe. 

482 """ 

483 

484 def __init__(self, refs, logger=None) -> None: 

485 super().__init__(logger=logger) 

486 self._refs = refs 

487 self._peeled: dict[bytes, ObjectID] = {} 

488 self._watchers: set[Any] = set() 

489 

490 def allkeys(self): 

491 return self._refs.keys() 

492 

493 def read_loose_ref(self, name): 

494 return self._refs.get(name, None) 

495 

496 def get_packed_refs(self): 

497 return {} 

498 

499 def _notify(self, ref, newsha) -> None: 

500 for watcher in self._watchers: 

501 watcher._notify((ref, newsha)) 

502 

503 def set_symbolic_ref( 

504 self, 

505 name: Ref, 

506 other: Ref, 

507 committer=None, 

508 timestamp=None, 

509 timezone=None, 

510 message=None, 

511 ) -> None: 

512 old = self.follow(name)[-1] 

513 new = SYMREF + other 

514 self._refs[name] = new 

515 self._notify(name, new) 

516 self._log( 

517 name, 

518 old, 

519 new, 

520 committer=committer, 

521 timestamp=timestamp, 

522 timezone=timezone, 

523 message=message, 

524 ) 

525 

526 def set_if_equals( 

527 self, 

528 name, 

529 old_ref, 

530 new_ref, 

531 committer=None, 

532 timestamp=None, 

533 timezone=None, 

534 message=None, 

535 ) -> bool: 

536 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

537 return False 

538 # Only update the specific ref requested, not the whole chain 

539 self._check_refname(name) 

540 old = self._refs.get(name) 

541 self._refs[name] = new_ref 

542 self._notify(name, new_ref) 

543 self._log( 

544 name, 

545 old, 

546 new_ref, 

547 committer=committer, 

548 timestamp=timestamp, 

549 timezone=timezone, 

550 message=message, 

551 ) 

552 return True 

553 

554 def add_if_new( 

555 self, 

556 name: Ref, 

557 ref: ObjectID, 

558 committer=None, 

559 timestamp=None, 

560 timezone=None, 

561 message: Optional[bytes] = None, 

562 ) -> bool: 

563 if name in self._refs: 

564 return False 

565 self._refs[name] = ref 

566 self._notify(name, ref) 

567 self._log( 

568 name, 

569 None, 

570 ref, 

571 committer=committer, 

572 timestamp=timestamp, 

573 timezone=timezone, 

574 message=message, 

575 ) 

576 return True 

577 

578 def remove_if_equals( 

579 self, 

580 name, 

581 old_ref, 

582 committer=None, 

583 timestamp=None, 

584 timezone=None, 

585 message=None, 

586 ) -> bool: 

587 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

588 return False 

589 try: 

590 old = self._refs.pop(name) 

591 except KeyError: 

592 pass 

593 else: 

594 self._notify(name, None) 

595 self._log( 

596 name, 

597 old, 

598 None, 

599 committer=committer, 

600 timestamp=timestamp, 

601 timezone=timezone, 

602 message=message, 

603 ) 

604 return True 

605 

606 def get_peeled(self, name): 

607 return self._peeled.get(name) 

608 

609 def _update(self, refs) -> None: 

610 """Update multiple refs; intended only for testing.""" 

611 # TODO(dborowitz): replace this with a public function that uses 

612 # set_if_equal. 

613 for ref, sha in refs.items(): 

614 self.set_if_equals(ref, None, sha) 

615 

616 def _update_peeled(self, peeled) -> None: 

617 """Update cached peeled refs; intended only for testing.""" 

618 self._peeled.update(peeled) 

619 

620 

621class InfoRefsContainer(RefsContainer): 

622 """Refs container that reads refs from a info/refs file.""" 

623 

624 def __init__(self, f) -> None: 

625 self._refs = {} 

626 self._peeled = {} 

627 refs = read_info_refs(f) 

628 (self._refs, self._peeled) = split_peeled_refs(refs) 

629 

630 def allkeys(self): 

631 return self._refs.keys() 

632 

633 def read_loose_ref(self, name): 

634 return self._refs.get(name, None) 

635 

636 def get_packed_refs(self): 

637 return {} 

638 

639 def get_peeled(self, name): 

640 try: 

641 return self._peeled[name] 

642 except KeyError: 

643 return self._refs[name] 

644 

645 

646class DiskRefsContainer(RefsContainer): 

647 """Refs container that reads refs from disk.""" 

648 

649 def __init__( 

650 self, 

651 path: Union[str, bytes, os.PathLike], 

652 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None, 

653 logger=None, 

654 ) -> None: 

655 super().__init__(logger=logger) 

656 # Convert path-like objects to strings, then to bytes for Git compatibility 

657 self.path = os.fsencode(os.fspath(path)) 

658 if worktree_path is None: 

659 self.worktree_path = self.path 

660 else: 

661 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

662 self._packed_refs = None 

663 self._peeled_refs = None 

664 

665 def __repr__(self) -> str: 

666 return f"{self.__class__.__name__}({self.path!r})" 

667 

668 def subkeys(self, base): 

669 subkeys = set() 

670 path = self.refpath(base) 

671 for root, unused_dirs, files in os.walk(path): 

672 dir = root[len(path) :] 

673 if os.path.sep != "/": 

674 dir = dir.replace(os.fsencode(os.path.sep), b"/") 

675 dir = dir.strip(b"/") 

676 for filename in files: 

677 refname = b"/".join(([dir] if dir else []) + [filename]) 

678 # check_ref_format requires at least one /, so we prepend the 

679 # base before calling it. 

680 if check_ref_format(base + b"/" + refname): 

681 subkeys.add(refname) 

682 for key in self.get_packed_refs(): 

683 if key.startswith(base): 

684 subkeys.add(key[len(base) :].strip(b"/")) 

685 return subkeys 

686 

687 def allkeys(self): 

688 allkeys = set() 

689 if os.path.exists(self.refpath(HEADREF)): 

690 allkeys.add(HEADREF) 

691 path = self.refpath(b"") 

692 refspath = self.refpath(b"refs") 

693 for root, unused_dirs, files in os.walk(refspath): 

694 dir = root[len(path) :] 

695 if os.path.sep != "/": 

696 dir = dir.replace(os.fsencode(os.path.sep), b"/") 

697 for filename in files: 

698 refname = b"/".join([dir, filename]) 

699 if check_ref_format(refname): 

700 allkeys.add(refname) 

701 allkeys.update(self.get_packed_refs()) 

702 return allkeys 

703 

704 def refpath(self, name): 

705 """Return the disk path of a ref.""" 

706 if os.path.sep != "/": 

707 name = name.replace(b"/", os.fsencode(os.path.sep)) 

708 # TODO: as the 'HEAD' reference is working tree specific, it 

709 # should actually not be a part of RefsContainer 

710 if name == HEADREF: 

711 return os.path.join(self.worktree_path, name) 

712 else: 

713 return os.path.join(self.path, name) 

714 

715 def get_packed_refs(self): 

716 """Get contents of the packed-refs file. 

717 

718 Returns: Dictionary mapping ref names to SHA1s 

719 

720 Note: Will return an empty dictionary when no packed-refs file is 

721 present. 

722 """ 

723 # TODO: invalidate the cache on repacking 

724 if self._packed_refs is None: 

725 # set both to empty because we want _peeled_refs to be 

726 # None if and only if _packed_refs is also None. 

727 self._packed_refs = {} 

728 self._peeled_refs = {} 

729 path = os.path.join(self.path, b"packed-refs") 

730 try: 

731 f = GitFile(path, "rb") 

732 except FileNotFoundError: 

733 return {} 

734 with f: 

735 first_line = next(iter(f)).rstrip() 

736 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

737 for sha, name, peeled in read_packed_refs_with_peeled(f): 

738 self._packed_refs[name] = sha 

739 if peeled: 

740 self._peeled_refs[name] = peeled 

741 else: 

742 f.seek(0) 

743 for sha, name in read_packed_refs(f): 

744 self._packed_refs[name] = sha 

745 return self._packed_refs 

746 

747 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

748 """Add the given refs as packed refs. 

749 

750 Args: 

751 new_refs: A mapping of ref names to targets; if a target is None that 

752 means remove the ref 

753 """ 

754 if not new_refs: 

755 return 

756 

757 path = os.path.join(self.path, b"packed-refs") 

758 

759 with GitFile(path, "wb") as f: 

760 # reread cached refs from disk, while holding the lock 

761 packed_refs = self.get_packed_refs().copy() 

762 

763 for ref, target in new_refs.items(): 

764 # sanity check 

765 if ref == HEADREF: 

766 raise ValueError("cannot pack HEAD") 

767 

768 # remove any loose refs pointing to this one -- please 

769 # note that this bypasses remove_if_equals as we don't 

770 # want to affect packed refs in here 

771 with suppress(OSError): 

772 os.remove(self.refpath(ref)) 

773 

774 if target is not None: 

775 packed_refs[ref] = target 

776 else: 

777 packed_refs.pop(ref, None) 

778 

779 write_packed_refs(f, packed_refs, self._peeled_refs) 

780 

781 self._packed_refs = packed_refs 

782 

783 def get_peeled(self, name): 

784 """Return the cached peeled value of a ref, if available. 

785 

786 Args: 

787 name: Name of the ref to peel 

788 Returns: The peeled value of the ref. If the ref is known not point to 

789 a tag, this will be the SHA the ref refers to. If the ref may point 

790 to a tag, but no cached information is available, None is returned. 

791 """ 

792 self.get_packed_refs() 

793 if self._peeled_refs is None or name not in self._packed_refs: 

794 # No cache: no peeled refs were read, or this ref is loose 

795 return None 

796 if name in self._peeled_refs: 

797 return self._peeled_refs[name] 

798 else: 

799 # Known not peelable 

800 return self[name] 

801 

802 def read_loose_ref(self, name): 

803 """Read a reference file and return its contents. 

804 

805 If the reference file a symbolic reference, only read the first line of 

806 the file. Otherwise, only read the first 40 bytes. 

807 

808 Args: 

809 name: the refname to read, relative to refpath 

810 Returns: The contents of the ref file, or None if the file does not 

811 exist. 

812 

813 Raises: 

814 IOError: if any other error occurs 

815 """ 

816 filename = self.refpath(name) 

817 try: 

818 with GitFile(filename, "rb") as f: 

819 header = f.read(len(SYMREF)) 

820 if header == SYMREF: 

821 # Read only the first line 

822 return header + next(iter(f)).rstrip(b"\r\n") 

823 else: 

824 # Read only the first 40 bytes 

825 return header + f.read(40 - len(SYMREF)) 

826 except (OSError, UnicodeError): 

827 # don't assume anything specific about the error; in 

828 # particular, invalid or forbidden paths can raise weird 

829 # errors depending on the specific operating system 

830 return None 

831 

832 def _remove_packed_ref(self, name) -> None: 

833 if self._packed_refs is None: 

834 return 

835 filename = os.path.join(self.path, b"packed-refs") 

836 # reread cached refs from disk, while holding the lock 

837 f = GitFile(filename, "wb") 

838 try: 

839 self._packed_refs = None 

840 self.get_packed_refs() 

841 

842 if name not in self._packed_refs: 

843 return 

844 

845 del self._packed_refs[name] 

846 with suppress(KeyError): 

847 del self._peeled_refs[name] 

848 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

849 f.close() 

850 finally: 

851 f.abort() 

852 

853 def set_symbolic_ref( 

854 self, 

855 name, 

856 other, 

857 committer=None, 

858 timestamp=None, 

859 timezone=None, 

860 message=None, 

861 ) -> None: 

862 """Make a ref point at another ref. 

863 

864 Args: 

865 name: Name of the ref to set 

866 other: Name of the ref to point at 

867 message: Optional message to describe the change 

868 """ 

869 self._check_refname(name) 

870 self._check_refname(other) 

871 filename = self.refpath(name) 

872 f = GitFile(filename, "wb") 

873 try: 

874 f.write(SYMREF + other + b"\n") 

875 sha = self.follow(name)[-1] 

876 self._log( 

877 name, 

878 sha, 

879 sha, 

880 committer=committer, 

881 timestamp=timestamp, 

882 timezone=timezone, 

883 message=message, 

884 ) 

885 except BaseException: 

886 f.abort() 

887 raise 

888 else: 

889 f.close() 

890 

891 def set_if_equals( 

892 self, 

893 name, 

894 old_ref, 

895 new_ref, 

896 committer=None, 

897 timestamp=None, 

898 timezone=None, 

899 message=None, 

900 ) -> bool: 

901 """Set a refname to new_ref only if it currently equals old_ref. 

902 

903 This method follows all symbolic references, and can be used to perform 

904 an atomic compare-and-swap operation. 

905 

906 Args: 

907 name: The refname to set. 

908 old_ref: The old sha the refname must refer to, or None to set 

909 unconditionally. 

910 new_ref: The new sha the refname will refer to. 

911 message: Set message for reflog 

912 Returns: True if the set was successful, False otherwise. 

913 """ 

914 self._check_refname(name) 

915 try: 

916 realnames, _ = self.follow(name) 

917 realname = realnames[-1] 

918 except (KeyError, IndexError, SymrefLoop): 

919 realname = name 

920 filename = self.refpath(realname) 

921 

922 # make sure none of the ancestor folders is in packed refs 

923 probe_ref = os.path.dirname(realname) 

924 packed_refs = self.get_packed_refs() 

925 while probe_ref: 

926 if packed_refs.get(probe_ref, None) is not None: 

927 raise NotADirectoryError(filename) 

928 probe_ref = os.path.dirname(probe_ref) 

929 

930 ensure_dir_exists(os.path.dirname(filename)) 

931 with GitFile(filename, "wb") as f: 

932 if old_ref is not None: 

933 try: 

934 # read again while holding the lock to handle race conditions 

935 orig_ref = self.read_loose_ref(realname) 

936 if orig_ref is None: 

937 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

938 if orig_ref != old_ref: 

939 f.abort() 

940 return False 

941 except OSError: 

942 f.abort() 

943 raise 

944 

945 # Check if ref already has the desired value while holding the lock 

946 # This avoids fsync when ref is unchanged but still detects lock conflicts 

947 current_ref = self.read_loose_ref(realname) 

948 if current_ref is None: 

949 current_ref = packed_refs.get(realname, None) 

950 

951 if current_ref is not None and current_ref == new_ref: 

952 # Ref already has desired value, abort write to avoid fsync 

953 f.abort() 

954 return True 

955 

956 try: 

957 f.write(new_ref + b"\n") 

958 except OSError: 

959 f.abort() 

960 raise 

961 self._log( 

962 realname, 

963 old_ref, 

964 new_ref, 

965 committer=committer, 

966 timestamp=timestamp, 

967 timezone=timezone, 

968 message=message, 

969 ) 

970 return True 

971 

972 def add_if_new( 

973 self, 

974 name: bytes, 

975 ref: bytes, 

976 committer=None, 

977 timestamp=None, 

978 timezone=None, 

979 message: Optional[bytes] = None, 

980 ) -> bool: 

981 """Add a new reference only if it does not already exist. 

982 

983 This method follows symrefs, and only ensures that the last ref in the 

984 chain does not exist. 

985 

986 Args: 

987 name: The refname to set. 

988 ref: The new sha the refname will refer to. 

989 message: Optional message for reflog 

990 Returns: True if the add was successful, False otherwise. 

991 """ 

992 try: 

993 realnames, contents = self.follow(name) 

994 if contents is not None: 

995 return False 

996 realname = realnames[-1] 

997 except (KeyError, IndexError): 

998 realname = name 

999 self._check_refname(realname) 

1000 filename = self.refpath(realname) 

1001 ensure_dir_exists(os.path.dirname(filename)) 

1002 with GitFile(filename, "wb") as f: 

1003 if os.path.exists(filename) or name in self.get_packed_refs(): 

1004 f.abort() 

1005 return False 

1006 try: 

1007 f.write(ref + b"\n") 

1008 except OSError: 

1009 f.abort() 

1010 raise 

1011 else: 

1012 self._log( 

1013 name, 

1014 None, 

1015 ref, 

1016 committer=committer, 

1017 timestamp=timestamp, 

1018 timezone=timezone, 

1019 message=message, 

1020 ) 

1021 return True 

1022 

1023 def remove_if_equals( 

1024 self, 

1025 name, 

1026 old_ref, 

1027 committer=None, 

1028 timestamp=None, 

1029 timezone=None, 

1030 message=None, 

1031 ) -> bool: 

1032 """Remove a refname only if it currently equals old_ref. 

1033 

1034 This method does not follow symbolic references. It can be used to 

1035 perform an atomic compare-and-delete operation. 

1036 

1037 Args: 

1038 name: The refname to delete. 

1039 old_ref: The old sha the refname must refer to, or None to 

1040 delete unconditionally. 

1041 message: Optional message 

1042 Returns: True if the delete was successful, False otherwise. 

1043 """ 

1044 self._check_refname(name) 

1045 filename = self.refpath(name) 

1046 ensure_dir_exists(os.path.dirname(filename)) 

1047 f = GitFile(filename, "wb") 

1048 try: 

1049 if old_ref is not None: 

1050 orig_ref = self.read_loose_ref(name) 

1051 if orig_ref is None: 

1052 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1053 if orig_ref != old_ref: 

1054 return False 

1055 

1056 # remove the reference file itself 

1057 try: 

1058 found = os.path.lexists(filename) 

1059 except OSError: 

1060 # may only be packed, or otherwise unstorable 

1061 found = False 

1062 

1063 if found: 

1064 os.remove(filename) 

1065 

1066 self._remove_packed_ref(name) 

1067 self._log( 

1068 name, 

1069 old_ref, 

1070 None, 

1071 committer=committer, 

1072 timestamp=timestamp, 

1073 timezone=timezone, 

1074 message=message, 

1075 ) 

1076 finally: 

1077 # never write, we just wanted the lock 

1078 f.abort() 

1079 

1080 # outside of the lock, clean-up any parent directory that might now 

1081 # be empty. this ensures that re-creating a reference of the same 

1082 # name of what was previously a directory works as expected 

1083 parent = name 

1084 while True: 

1085 try: 

1086 parent, _ = parent.rsplit(b"/", 1) 

1087 except ValueError: 

1088 break 

1089 

1090 if parent == b"refs": 

1091 break 

1092 parent_filename = self.refpath(parent) 

1093 try: 

1094 os.rmdir(parent_filename) 

1095 except OSError: 

1096 # this can be caused by the parent directory being 

1097 # removed by another process, being not empty, etc. 

1098 # in any case, this is non fatal because we already 

1099 # removed the reference, just ignore it 

1100 break 

1101 

1102 return True 

1103 

1104 def pack_refs(self, all: bool = False) -> None: 

1105 """Pack loose refs into packed-refs file. 

1106 

1107 Args: 

1108 all: If True, pack all refs. If False, only pack tags. 

1109 """ 

1110 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1111 for ref in self.allkeys(): 

1112 if ref == HEADREF: 

1113 # Never pack HEAD 

1114 continue 

1115 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1116 try: 

1117 sha = self[ref] 

1118 if sha: 

1119 refs_to_pack[ref] = sha 

1120 except KeyError: 

1121 # Broken ref, skip it 

1122 pass 

1123 

1124 if refs_to_pack: 

1125 self.add_packed_refs(refs_to_pack) 

1126 

1127 

1128def _split_ref_line(line): 

1129 """Split a single ref line into a tuple of SHA1 and name.""" 

1130 fields = line.rstrip(b"\n\r").split(b" ") 

1131 if len(fields) != 2: 

1132 raise PackedRefsException(f"invalid ref line {line!r}") 

1133 sha, name = fields 

1134 if not valid_hexsha(sha): 

1135 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1136 if not check_ref_format(name): 

1137 raise PackedRefsException(f"invalid ref name {name!r}") 

1138 return (sha, name) 

1139 

1140 

1141def read_packed_refs(f): 

1142 """Read a packed refs file. 

1143 

1144 Args: 

1145 f: file-like object to read from 

1146 Returns: Iterator over tuples with SHA1s and ref names. 

1147 """ 

1148 for line in f: 

1149 if line.startswith(b"#"): 

1150 # Comment 

1151 continue 

1152 if line.startswith(b"^"): 

1153 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1154 yield _split_ref_line(line) 

1155 

1156 

1157def read_packed_refs_with_peeled(f): 

1158 """Read a packed refs file including peeled refs. 

1159 

1160 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1161 with ref names, SHA1s, and peeled SHA1s (or None). 

1162 

1163 Args: 

1164 f: file-like object to read from, seek'ed to the second line 

1165 """ 

1166 last = None 

1167 for line in f: 

1168 if line[0] == b"#": 

1169 continue 

1170 line = line.rstrip(b"\r\n") 

1171 if line.startswith(b"^"): 

1172 if not last: 

1173 raise PackedRefsException("unexpected peeled ref line") 

1174 if not valid_hexsha(line[1:]): 

1175 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1176 sha, name = _split_ref_line(last) 

1177 last = None 

1178 yield (sha, name, line[1:]) 

1179 else: 

1180 if last: 

1181 sha, name = _split_ref_line(last) 

1182 yield (sha, name, None) 

1183 last = line 

1184 if last: 

1185 sha, name = _split_ref_line(last) 

1186 yield (sha, name, None) 

1187 

1188 

1189def write_packed_refs(f, packed_refs, peeled_refs=None) -> None: 

1190 """Write a packed refs file. 

1191 

1192 Args: 

1193 f: empty file-like object to write to 

1194 packed_refs: dict of refname to sha of packed refs to write 

1195 peeled_refs: dict of refname to peeled value of sha 

1196 """ 

1197 if peeled_refs is None: 

1198 peeled_refs = {} 

1199 else: 

1200 f.write(b"# pack-refs with: peeled\n") 

1201 for refname in sorted(packed_refs.keys()): 

1202 f.write(git_line(packed_refs[refname], refname)) 

1203 if refname in peeled_refs: 

1204 f.write(b"^" + peeled_refs[refname] + b"\n") 

1205 

1206 

1207def read_info_refs(f): 

1208 ret = {} 

1209 for line in f.readlines(): 

1210 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1211 ret[name] = sha 

1212 return ret 

1213 

1214 

1215def write_info_refs(refs, store: ObjectContainer): 

1216 """Generate info refs.""" 

1217 # TODO: Avoid recursive import :( 

1218 from .object_store import peel_sha 

1219 

1220 for name, sha in sorted(refs.items()): 

1221 # get_refs() includes HEAD as a special case, but we don't want to 

1222 # advertise it 

1223 if name == HEADREF: 

1224 continue 

1225 try: 

1226 o = store[sha] 

1227 except KeyError: 

1228 continue 

1229 unpeeled, peeled = peel_sha(store, sha) 

1230 yield o.id + b"\t" + name + b"\n" 

1231 if o.id != peeled.id: 

1232 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1233 

1234 

1235def is_local_branch(x): 

1236 return x.startswith(LOCAL_BRANCH_PREFIX) 

1237 

1238 

1239def strip_peeled_refs(refs): 

1240 """Remove all peeled refs.""" 

1241 return { 

1242 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1243 } 

1244 

1245 

1246def split_peeled_refs(refs): 

1247 """Split peeled refs from regular refs.""" 

1248 peeled = {} 

1249 regular = {} 

1250 for ref, sha in refs.items(): 

1251 if ref.endswith(PEELED_TAG_SUFFIX): 

1252 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1253 else: 

1254 regular[ref] = sha 

1255 return regular, peeled 

1256 

1257 

1258def _set_origin_head(refs, origin, origin_head) -> None: 

1259 # set refs/remotes/origin/HEAD 

1260 origin_base = b"refs/remotes/" + origin + b"/" 

1261 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1262 origin_ref = origin_base + HEADREF 

1263 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1264 if target_ref in refs: 

1265 refs.set_symbolic_ref(origin_ref, target_ref) 

1266 

1267 

1268def _set_default_branch( 

1269 refs: RefsContainer, 

1270 origin: bytes, 

1271 origin_head: Optional[bytes], 

1272 branch: bytes, 

1273 ref_message: Optional[bytes], 

1274) -> bytes: 

1275 """Set the default branch.""" 

1276 origin_base = b"refs/remotes/" + origin + b"/" 

1277 if branch: 

1278 origin_ref = origin_base + branch 

1279 if origin_ref in refs: 

1280 local_ref = LOCAL_BRANCH_PREFIX + branch 

1281 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1282 head_ref = local_ref 

1283 elif LOCAL_TAG_PREFIX + branch in refs: 

1284 head_ref = LOCAL_TAG_PREFIX + branch 

1285 else: 

1286 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1287 elif origin_head: 

1288 head_ref = origin_head 

1289 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1290 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1291 else: 

1292 origin_ref = origin_head 

1293 try: 

1294 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1295 except KeyError: 

1296 pass 

1297 else: 

1298 raise ValueError("neither origin_head nor branch are provided") 

1299 return head_ref 

1300 

1301 

1302def _set_head(refs, head_ref, ref_message): 

1303 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1304 # detach HEAD at specified tag 

1305 head = refs[head_ref] 

1306 if isinstance(head, Tag): 

1307 _cls, obj = head.object 

1308 head = obj.get_object(obj).id 

1309 del refs[HEADREF] 

1310 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1311 else: 

1312 # set HEAD to specific branch 

1313 try: 

1314 head = refs[head_ref] 

1315 refs.set_symbolic_ref(HEADREF, head_ref) 

1316 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1317 except KeyError: 

1318 head = None 

1319 return head 

1320 

1321 

1322def _import_remote_refs( 

1323 refs_container: RefsContainer, 

1324 remote_name: str, 

1325 refs: dict[str, str], 

1326 message: Optional[bytes] = None, 

1327 prune: bool = False, 

1328 prune_tags: bool = False, 

1329) -> None: 

1330 stripped_refs = strip_peeled_refs(refs) 

1331 branches = { 

1332 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1333 for (n, v) in stripped_refs.items() 

1334 if n.startswith(LOCAL_BRANCH_PREFIX) 

1335 } 

1336 refs_container.import_refs( 

1337 b"refs/remotes/" + remote_name.encode(), 

1338 branches, 

1339 message=message, 

1340 prune=prune, 

1341 ) 

1342 tags = { 

1343 n[len(LOCAL_TAG_PREFIX) :]: v 

1344 for (n, v) in stripped_refs.items() 

1345 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1346 } 

1347 refs_container.import_refs( 

1348 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1349 ) 

1350 

1351 

1352def serialize_refs(store, refs): 

1353 # TODO: Avoid recursive import :( 

1354 from .object_store import peel_sha 

1355 

1356 ret = {} 

1357 for ref, sha in refs.items(): 

1358 try: 

1359 unpeeled, peeled = peel_sha(store, sha) 

1360 except KeyError: 

1361 warnings.warn( 

1362 "ref {} points at non-present sha {}".format( 

1363 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1364 ), 

1365 UserWarning, 

1366 ) 

1367 continue 

1368 else: 

1369 if isinstance(unpeeled, Tag): 

1370 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1371 ret[ref] = unpeeled.id 

1372 return ret