Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

704 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import types 

27import warnings 

28from collections.abc import Iterable, Iterator 

29from contextlib import suppress 

30from typing import ( 

31 IO, 

32 TYPE_CHECKING, 

33 Any, 

34 BinaryIO, 

35 Callable, 

36 Optional, 

37 TypeVar, 

38 Union, 

39 cast, 

40) 

41 

42if TYPE_CHECKING: 

43 from .file import _GitFile 

44 

45from .errors import PackedRefsException, RefFormatError 

46from .file import GitFile, ensure_dir_exists 

47from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

48from .pack import ObjectContainer 

49 

50Ref = bytes 

51 

52HEADREF = b"HEAD" 

53SYMREF = b"ref: " 

54LOCAL_BRANCH_PREFIX = b"refs/heads/" 

55LOCAL_TAG_PREFIX = b"refs/tags/" 

56LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

57LOCAL_NOTES_PREFIX = b"refs/notes/" 

58BAD_REF_CHARS = set(b"\177 ~^:?*[") 

59PEELED_TAG_SUFFIX = b"^{}" 

60 

61# For backwards compatibility 

62ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

63 

64 

65class SymrefLoop(Exception): 

66 """There is a loop between one or more symrefs.""" 

67 

68 def __init__(self, ref: bytes, depth: int) -> None: 

69 """Initialize SymrefLoop exception.""" 

70 self.ref = ref 

71 self.depth = depth 

72 

73 

74def parse_symref_value(contents: bytes) -> bytes: 

75 """Parse a symref value. 

76 

77 Args: 

78 contents: Contents to parse 

79 Returns: Destination 

80 """ 

81 if contents.startswith(SYMREF): 

82 return contents[len(SYMREF) :].rstrip(b"\r\n") 

83 raise ValueError(contents) 

84 

85 

86def check_ref_format(refname: Ref) -> bool: 

87 """Check if a refname is correctly formatted. 

88 

89 Implements all the same rules as git-check-ref-format[1]. 

90 

91 [1] 

92 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

93 

94 Args: 

95 refname: The refname to check 

96 Returns: True if refname is valid, False otherwise 

97 """ 

98 # These could be combined into one big expression, but are listed 

99 # separately to parallel [1]. 

100 if b"/." in refname or refname.startswith(b"."): 

101 return False 

102 if b"/" not in refname: 

103 return False 

104 if b".." in refname: 

105 return False 

106 for i, c in enumerate(refname): 

107 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

108 return False 

109 if refname[-1] in b"/.": 

110 return False 

111 if refname.endswith(b".lock"): 

112 return False 

113 if b"@{" in refname: 

114 return False 

115 if b"\\" in refname: 

116 return False 

117 return True 

118 

119 

120def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

121 """Parse a remote ref into remote name and branch name. 

122 

123 Args: 

124 ref: Remote ref like b"refs/remotes/origin/main" 

125 

126 Returns: 

127 Tuple of (remote_name, branch_name) 

128 

129 Raises: 

130 ValueError: If ref is not a valid remote ref 

131 """ 

132 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

133 raise ValueError(f"Not a remote ref: {ref!r}") 

134 

135 # Remove the prefix 

136 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

137 

138 # Split into remote name and branch name 

139 parts = remainder.split(b"/", 1) 

140 if len(parts) != 2: 

141 raise ValueError(f"Invalid remote ref format: {ref!r}") 

142 

143 remote_name, branch_name = parts 

144 return (remote_name, branch_name) 

145 

146 

147class RefsContainer: 

148 """A container for refs.""" 

149 

150 def __init__( 

151 self, 

152 logger: Optional[ 

153 Callable[ 

154 [ 

155 bytes, 

156 Optional[bytes], 

157 Optional[bytes], 

158 Optional[bytes], 

159 Optional[int], 

160 Optional[int], 

161 Optional[bytes], 

162 ], 

163 None, 

164 ] 

165 ] = None, 

166 ) -> None: 

167 """Initialize RefsContainer with optional logger function.""" 

168 self._logger = logger 

169 

170 def _log( 

171 self, 

172 ref: bytes, 

173 old_sha: Optional[bytes], 

174 new_sha: Optional[bytes], 

175 committer: Optional[bytes] = None, 

176 timestamp: Optional[int] = None, 

177 timezone: Optional[int] = None, 

178 message: Optional[bytes] = None, 

179 ) -> None: 

180 if self._logger is None: 

181 return 

182 if message is None: 

183 return 

184 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

185 

186 def set_symbolic_ref( 

187 self, 

188 name: bytes, 

189 other: bytes, 

190 committer: Optional[bytes] = None, 

191 timestamp: Optional[int] = None, 

192 timezone: Optional[int] = None, 

193 message: Optional[bytes] = None, 

194 ) -> None: 

195 """Make a ref point at another ref. 

196 

197 Args: 

198 name: Name of the ref to set 

199 other: Name of the ref to point at 

200 committer: Optional committer name/email 

201 timestamp: Optional timestamp 

202 timezone: Optional timezone 

203 message: Optional message 

204 """ 

205 raise NotImplementedError(self.set_symbolic_ref) 

206 

207 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

208 """Get contents of the packed-refs file. 

209 

210 Returns: Dictionary mapping ref names to SHA1s 

211 

212 Note: Will return an empty dictionary when no packed-refs file is 

213 present. 

214 """ 

215 raise NotImplementedError(self.get_packed_refs) 

216 

217 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

218 """Add the given refs as packed refs. 

219 

220 Args: 

221 new_refs: A mapping of ref names to targets; if a target is None that 

222 means remove the ref 

223 """ 

224 raise NotImplementedError(self.add_packed_refs) 

225 

226 def get_peeled(self, name: bytes) -> Optional[ObjectID]: 

227 """Return the cached peeled value of a ref, if available. 

228 

229 Args: 

230 name: Name of the ref to peel 

231 Returns: The peeled value of the ref. If the ref is known not point to 

232 a tag, this will be the SHA the ref refers to. If the ref may point 

233 to a tag, but no cached information is available, None is returned. 

234 """ 

235 return None 

236 

237 def import_refs( 

238 self, 

239 base: Ref, 

240 other: dict[Ref, ObjectID], 

241 committer: Optional[bytes] = None, 

242 timestamp: Optional[bytes] = None, 

243 timezone: Optional[bytes] = None, 

244 message: Optional[bytes] = None, 

245 prune: bool = False, 

246 ) -> None: 

247 """Import refs from another repository. 

248 

249 Args: 

250 base: Base ref to import into (e.g., b'refs/remotes/origin') 

251 other: Dictionary of refs to import 

252 committer: Optional committer for reflog 

253 timestamp: Optional timestamp for reflog 

254 timezone: Optional timezone for reflog 

255 message: Optional message for reflog 

256 prune: If True, remove refs not in other 

257 """ 

258 if prune: 

259 to_delete = set(self.subkeys(base)) 

260 else: 

261 to_delete = set() 

262 for name, value in other.items(): 

263 if value is None: 

264 to_delete.add(name) 

265 else: 

266 self.set_if_equals( 

267 b"/".join((base, name)), None, value, message=message 

268 ) 

269 if to_delete: 

270 try: 

271 to_delete.remove(name) 

272 except KeyError: 

273 pass 

274 for ref in to_delete: 

275 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

276 

277 def allkeys(self) -> set[Ref]: 

278 """All refs present in this container.""" 

279 raise NotImplementedError(self.allkeys) 

280 

281 def __iter__(self) -> Iterator[Ref]: 

282 """Iterate over all reference keys.""" 

283 return iter(self.allkeys()) 

284 

285 def keys(self, base=None): 

286 """Refs present in this container. 

287 

288 Args: 

289 base: An optional base to return refs under. 

290 Returns: An unsorted set of valid refs in this container, including 

291 packed refs. 

292 """ 

293 if base is not None: 

294 return self.subkeys(base) 

295 else: 

296 return self.allkeys() 

297 

298 def subkeys(self, base: bytes) -> set[bytes]: 

299 """Refs present in this container under a base. 

300 

301 Args: 

302 base: The base to return refs under. 

303 Returns: A set of valid refs in this container under the base; the base 

304 prefix is stripped from the ref names returned. 

305 """ 

306 keys = set() 

307 base_len = len(base) + 1 

308 for refname in self.allkeys(): 

309 if refname.startswith(base): 

310 keys.add(refname[base_len:]) 

311 return keys 

312 

313 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]: 

314 """Return the contents of this container as a dictionary.""" 

315 ret = {} 

316 keys = self.keys(base) 

317 if base is None: 

318 base = b"" 

319 else: 

320 base = base.rstrip(b"/") 

321 for key in keys: 

322 try: 

323 ret[key] = self[(base + b"/" + key).strip(b"/")] 

324 except (SymrefLoop, KeyError): 

325 continue # Unable to resolve 

326 

327 return ret 

328 

329 def _check_refname(self, name: bytes) -> None: 

330 """Ensure a refname is valid and lives in refs or is HEAD. 

331 

332 HEAD is not a valid refname according to git-check-ref-format, but this 

333 class needs to be able to touch HEAD. Also, check_ref_format expects 

334 refnames without the leading 'refs/', but this class requires that 

335 so it cannot touch anything outside the refs dir (or HEAD). 

336 

337 Args: 

338 name: The name of the reference. 

339 

340 Raises: 

341 KeyError: if a refname is not HEAD or is otherwise not valid. 

342 """ 

343 if name in (HEADREF, b"refs/stash"): 

344 return 

345 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

346 raise RefFormatError(name) 

347 

348 def read_ref(self, refname: bytes) -> Optional[bytes]: 

349 """Read a reference without following any references. 

350 

351 Args: 

352 refname: The name of the reference 

353 Returns: The contents of the ref file, or None if it does 

354 not exist. 

355 """ 

356 contents = self.read_loose_ref(refname) 

357 if not contents: 

358 contents = self.get_packed_refs().get(refname, None) 

359 return contents 

360 

361 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

362 """Read a loose reference and return its contents. 

363 

364 Args: 

365 name: the refname to read 

366 Returns: The contents of the ref file, or None if it does 

367 not exist. 

368 """ 

369 raise NotImplementedError(self.read_loose_ref) 

370 

371 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]: 

372 """Follow a reference name. 

373 

374 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

375 references in the chain 

376 """ 

377 contents: Optional[bytes] = SYMREF + name 

378 depth = 0 

379 refnames = [] 

380 while contents and contents.startswith(SYMREF): 

381 refname = contents[len(SYMREF) :] 

382 refnames.append(refname) 

383 contents = self.read_ref(refname) 

384 if not contents: 

385 break 

386 depth += 1 

387 if depth > 5: 

388 raise SymrefLoop(name, depth) 

389 return refnames, contents 

390 

391 def __contains__(self, refname: bytes) -> bool: 

392 """Check if a reference exists.""" 

393 if self.read_ref(refname): 

394 return True 

395 return False 

396 

397 def __getitem__(self, name: bytes) -> ObjectID: 

398 """Get the SHA1 for a reference name. 

399 

400 This method follows all symbolic references. 

401 """ 

402 _, sha = self.follow(name) 

403 if sha is None: 

404 raise KeyError(name) 

405 return sha 

406 

407 def set_if_equals( 

408 self, 

409 name: bytes, 

410 old_ref: Optional[bytes], 

411 new_ref: bytes, 

412 committer: Optional[bytes] = None, 

413 timestamp: Optional[int] = None, 

414 timezone: Optional[int] = None, 

415 message: Optional[bytes] = None, 

416 ) -> bool: 

417 """Set a refname to new_ref only if it currently equals old_ref. 

418 

419 This method follows all symbolic references if applicable for the 

420 subclass, and can be used to perform an atomic compare-and-swap 

421 operation. 

422 

423 Args: 

424 name: The refname to set. 

425 old_ref: The old sha the refname must refer to, or None to set 

426 unconditionally. 

427 new_ref: The new sha the refname will refer to. 

428 committer: Optional committer name/email 

429 timestamp: Optional timestamp 

430 timezone: Optional timezone 

431 message: Message for reflog 

432 Returns: True if the set was successful, False otherwise. 

433 """ 

434 raise NotImplementedError(self.set_if_equals) 

435 

436 def add_if_new( 

437 self, 

438 name: bytes, 

439 ref: bytes, 

440 committer: Optional[bytes] = None, 

441 timestamp: Optional[int] = None, 

442 timezone: Optional[int] = None, 

443 message: Optional[bytes] = None, 

444 ) -> bool: 

445 """Add a new reference only if it does not already exist. 

446 

447 Args: 

448 name: Ref name 

449 ref: Ref value 

450 committer: Optional committer name/email 

451 timestamp: Optional timestamp 

452 timezone: Optional timezone 

453 message: Optional message for reflog 

454 """ 

455 raise NotImplementedError(self.add_if_new) 

456 

457 def __setitem__(self, name: bytes, ref: bytes) -> None: 

458 """Set a reference name to point to the given SHA1. 

459 

460 This method follows all symbolic references if applicable for the 

461 subclass. 

462 

463 Note: This method unconditionally overwrites the contents of a 

464 reference. To update atomically only if the reference has not 

465 changed, use set_if_equals(). 

466 

467 Args: 

468 name: The refname to set. 

469 ref: The new sha the refname will refer to. 

470 """ 

471 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

472 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

473 self.set_if_equals(name, None, ref) 

474 

475 def remove_if_equals( 

476 self, 

477 name: bytes, 

478 old_ref: Optional[bytes], 

479 committer: Optional[bytes] = None, 

480 timestamp: Optional[int] = None, 

481 timezone: Optional[int] = None, 

482 message: Optional[bytes] = None, 

483 ) -> bool: 

484 """Remove a refname only if it currently equals old_ref. 

485 

486 This method does not follow symbolic references, even if applicable for 

487 the subclass. It can be used to perform an atomic compare-and-delete 

488 operation. 

489 

490 Args: 

491 name: The refname to delete. 

492 old_ref: The old sha the refname must refer to, or None to 

493 delete unconditionally. 

494 committer: Optional committer name/email 

495 timestamp: Optional timestamp 

496 timezone: Optional timezone 

497 message: Message for reflog 

498 Returns: True if the delete was successful, False otherwise. 

499 """ 

500 raise NotImplementedError(self.remove_if_equals) 

501 

502 def __delitem__(self, name: bytes) -> None: 

503 """Remove a refname. 

504 

505 This method does not follow symbolic references, even if applicable for 

506 the subclass. 

507 

508 Note: This method unconditionally deletes the contents of a reference. 

509 To delete atomically only if the reference has not changed, use 

510 remove_if_equals(). 

511 

512 Args: 

513 name: The refname to delete. 

514 """ 

515 self.remove_if_equals(name, None) 

516 

517 def get_symrefs(self) -> dict[bytes, bytes]: 

518 """Get a dict with all symrefs in this container. 

519 

520 Returns: Dictionary mapping source ref to target ref 

521 """ 

522 ret = {} 

523 for src in self.allkeys(): 

524 try: 

525 ref_value = self.read_ref(src) 

526 assert ref_value is not None 

527 dst = parse_symref_value(ref_value) 

528 except ValueError: 

529 pass 

530 else: 

531 ret[src] = dst 

532 return ret 

533 

534 def pack_refs(self, all: bool = False) -> None: 

535 """Pack loose refs into packed-refs file. 

536 

537 Args: 

538 all: If True, pack all refs. If False, only pack tags. 

539 """ 

540 raise NotImplementedError(self.pack_refs) 

541 

542 

543class DictRefsContainer(RefsContainer): 

544 """RefsContainer backed by a simple dict. 

545 

546 This container does not support symbolic or packed references and is not 

547 threadsafe. 

548 """ 

549 

550 def __init__( 

551 self, 

552 refs: dict[bytes, bytes], 

553 logger: Optional[ 

554 Callable[ 

555 [ 

556 bytes, 

557 Optional[bytes], 

558 Optional[bytes], 

559 Optional[bytes], 

560 Optional[int], 

561 Optional[int], 

562 Optional[bytes], 

563 ], 

564 None, 

565 ] 

566 ] = None, 

567 ) -> None: 

568 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

569 super().__init__(logger=logger) 

570 self._refs = refs 

571 self._peeled: dict[bytes, ObjectID] = {} 

572 self._watchers: set[Any] = set() 

573 

574 def allkeys(self) -> set[bytes]: 

575 """Return all reference keys.""" 

576 return set(self._refs.keys()) 

577 

578 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

579 """Read a loose reference.""" 

580 return self._refs.get(name, None) 

581 

582 def get_packed_refs(self) -> dict[bytes, bytes]: 

583 """Get packed references.""" 

584 return {} 

585 

586 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None: 

587 for watcher in self._watchers: 

588 watcher._notify((ref, newsha)) 

589 

590 def set_symbolic_ref( 

591 self, 

592 name: Ref, 

593 other: Ref, 

594 committer: Optional[bytes] = None, 

595 timestamp: Optional[int] = None, 

596 timezone: Optional[int] = None, 

597 message: Optional[bytes] = None, 

598 ) -> None: 

599 """Make a ref point at another ref. 

600 

601 Args: 

602 name: Name of the ref to set 

603 other: Name of the ref to point at 

604 committer: Optional committer name for reflog 

605 timestamp: Optional timestamp for reflog 

606 timezone: Optional timezone for reflog 

607 message: Optional message for reflog 

608 """ 

609 old = self.follow(name)[-1] 

610 new = SYMREF + other 

611 self._refs[name] = new 

612 self._notify(name, new) 

613 self._log( 

614 name, 

615 old, 

616 new, 

617 committer=committer, 

618 timestamp=timestamp, 

619 timezone=timezone, 

620 message=message, 

621 ) 

622 

623 def set_if_equals( 

624 self, 

625 name: bytes, 

626 old_ref: Optional[bytes], 

627 new_ref: bytes, 

628 committer: Optional[bytes] = None, 

629 timestamp: Optional[int] = None, 

630 timezone: Optional[int] = None, 

631 message: Optional[bytes] = None, 

632 ) -> bool: 

633 """Set a refname to new_ref only if it currently equals old_ref. 

634 

635 This method follows all symbolic references, and can be used to perform 

636 an atomic compare-and-swap operation. 

637 

638 Args: 

639 name: The refname to set. 

640 old_ref: The old sha the refname must refer to, or None to set 

641 unconditionally. 

642 new_ref: The new sha the refname will refer to. 

643 committer: Optional committer name for reflog 

644 timestamp: Optional timestamp for reflog 

645 timezone: Optional timezone for reflog 

646 message: Optional message for reflog 

647 

648 Returns: 

649 True if the set was successful, False otherwise. 

650 """ 

651 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

652 return False 

653 # Only update the specific ref requested, not the whole chain 

654 self._check_refname(name) 

655 old = self._refs.get(name) 

656 self._refs[name] = new_ref 

657 self._notify(name, new_ref) 

658 self._log( 

659 name, 

660 old, 

661 new_ref, 

662 committer=committer, 

663 timestamp=timestamp, 

664 timezone=timezone, 

665 message=message, 

666 ) 

667 return True 

668 

669 def add_if_new( 

670 self, 

671 name: Ref, 

672 ref: ObjectID, 

673 committer: Optional[bytes] = None, 

674 timestamp: Optional[int] = None, 

675 timezone: Optional[int] = None, 

676 message: Optional[bytes] = None, 

677 ) -> bool: 

678 """Add a new reference only if it does not already exist. 

679 

680 Args: 

681 name: Ref name 

682 ref: Ref value 

683 committer: Optional committer name for reflog 

684 timestamp: Optional timestamp for reflog 

685 timezone: Optional timezone for reflog 

686 message: Optional message for reflog 

687 

688 Returns: 

689 True if the add was successful, False otherwise. 

690 """ 

691 if name in self._refs: 

692 return False 

693 self._refs[name] = ref 

694 self._notify(name, ref) 

695 self._log( 

696 name, 

697 None, 

698 ref, 

699 committer=committer, 

700 timestamp=timestamp, 

701 timezone=timezone, 

702 message=message, 

703 ) 

704 return True 

705 

706 def remove_if_equals( 

707 self, 

708 name: bytes, 

709 old_ref: Optional[bytes], 

710 committer: Optional[bytes] = None, 

711 timestamp: Optional[int] = None, 

712 timezone: Optional[int] = None, 

713 message: Optional[bytes] = None, 

714 ) -> bool: 

715 """Remove a refname only if it currently equals old_ref. 

716 

717 This method does not follow symbolic references. It can be used to 

718 perform an atomic compare-and-delete operation. 

719 

720 Args: 

721 name: The refname to delete. 

722 old_ref: The old sha the refname must refer to, or None to 

723 delete unconditionally. 

724 committer: Optional committer name for reflog 

725 timestamp: Optional timestamp for reflog 

726 timezone: Optional timezone for reflog 

727 message: Optional message for reflog 

728 

729 Returns: 

730 True if the delete was successful, False otherwise. 

731 """ 

732 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

733 return False 

734 try: 

735 old = self._refs.pop(name) 

736 except KeyError: 

737 pass 

738 else: 

739 self._notify(name, None) 

740 self._log( 

741 name, 

742 old, 

743 None, 

744 committer=committer, 

745 timestamp=timestamp, 

746 timezone=timezone, 

747 message=message, 

748 ) 

749 return True 

750 

751 def get_peeled(self, name: bytes) -> Optional[bytes]: 

752 """Get peeled version of a reference.""" 

753 return self._peeled.get(name) 

754 

755 def _update(self, refs: dict[bytes, bytes]) -> None: 

756 """Update multiple refs; intended only for testing.""" 

757 # TODO(dborowitz): replace this with a public function that uses 

758 # set_if_equal. 

759 for ref, sha in refs.items(): 

760 self.set_if_equals(ref, None, sha) 

761 

762 def _update_peeled(self, peeled: dict[bytes, bytes]) -> None: 

763 """Update cached peeled refs; intended only for testing.""" 

764 self._peeled.update(peeled) 

765 

766 

767class InfoRefsContainer(RefsContainer): 

768 """Refs container that reads refs from a info/refs file.""" 

769 

770 def __init__(self, f: BinaryIO) -> None: 

771 """Initialize InfoRefsContainer from info/refs file.""" 

772 self._refs: dict[bytes, bytes] = {} 

773 self._peeled: dict[bytes, bytes] = {} 

774 refs = read_info_refs(f) 

775 (self._refs, self._peeled) = split_peeled_refs(refs) 

776 

777 def allkeys(self) -> set[bytes]: 

778 """Return all reference keys.""" 

779 return set(self._refs.keys()) 

780 

781 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

782 """Read a loose reference.""" 

783 return self._refs.get(name, None) 

784 

785 def get_packed_refs(self) -> dict[bytes, bytes]: 

786 """Get packed references.""" 

787 return {} 

788 

789 def get_peeled(self, name: bytes) -> Optional[bytes]: 

790 """Get peeled version of a reference.""" 

791 try: 

792 return self._peeled[name] 

793 except KeyError: 

794 return self._refs[name] 

795 

796 

797class DiskRefsContainer(RefsContainer): 

798 """Refs container that reads refs from disk.""" 

799 

800 def __init__( 

801 self, 

802 path: Union[str, bytes, os.PathLike], 

803 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None, 

804 logger: Optional[ 

805 Callable[ 

806 [ 

807 bytes, 

808 Optional[bytes], 

809 Optional[bytes], 

810 Optional[bytes], 

811 Optional[int], 

812 Optional[int], 

813 Optional[bytes], 

814 ], 

815 None, 

816 ] 

817 ] = None, 

818 ) -> None: 

819 """Initialize DiskRefsContainer.""" 

820 super().__init__(logger=logger) 

821 # Convert path-like objects to strings, then to bytes for Git compatibility 

822 self.path = os.fsencode(os.fspath(path)) 

823 if worktree_path is None: 

824 self.worktree_path = self.path 

825 else: 

826 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

827 self._packed_refs: Optional[dict[bytes, bytes]] = None 

828 self._peeled_refs: Optional[dict[bytes, bytes]] = None 

829 

830 def __repr__(self) -> str: 

831 """Return string representation of DiskRefsContainer.""" 

832 return f"{self.__class__.__name__}({self.path!r})" 

833 

834 def subkeys(self, base: bytes) -> set[bytes]: 

835 """Return subkeys under a given base reference path.""" 

836 subkeys = set() 

837 path = self.refpath(base) 

838 for root, unused_dirs, files in os.walk(path): 

839 directory = root[len(path) :] 

840 if os.path.sep != "/": 

841 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

842 directory = directory.strip(b"/") 

843 for filename in files: 

844 refname = b"/".join(([directory] if directory else []) + [filename]) 

845 # check_ref_format requires at least one /, so we prepend the 

846 # base before calling it. 

847 if check_ref_format(base + b"/" + refname): 

848 subkeys.add(refname) 

849 for key in self.get_packed_refs(): 

850 if key.startswith(base): 

851 subkeys.add(key[len(base) :].strip(b"/")) 

852 return subkeys 

853 

854 def allkeys(self) -> set[bytes]: 

855 """Return all reference keys.""" 

856 allkeys = set() 

857 if os.path.exists(self.refpath(HEADREF)): 

858 allkeys.add(HEADREF) 

859 path = self.refpath(b"") 

860 refspath = self.refpath(b"refs") 

861 for root, unused_dirs, files in os.walk(refspath): 

862 directory = root[len(path) :] 

863 if os.path.sep != "/": 

864 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

865 for filename in files: 

866 refname = b"/".join([directory, filename]) 

867 if check_ref_format(refname): 

868 allkeys.add(refname) 

869 allkeys.update(self.get_packed_refs()) 

870 return allkeys 

871 

872 def refpath(self, name: bytes) -> bytes: 

873 """Return the disk path of a ref.""" 

874 if os.path.sep != "/": 

875 name = name.replace(b"/", os.fsencode(os.path.sep)) 

876 # TODO: as the 'HEAD' reference is working tree specific, it 

877 # should actually not be a part of RefsContainer 

878 if name == HEADREF: 

879 return os.path.join(self.worktree_path, name) 

880 else: 

881 return os.path.join(self.path, name) 

882 

883 def get_packed_refs(self) -> dict[bytes, bytes]: 

884 """Get contents of the packed-refs file. 

885 

886 Returns: Dictionary mapping ref names to SHA1s 

887 

888 Note: Will return an empty dictionary when no packed-refs file is 

889 present. 

890 """ 

891 # TODO: invalidate the cache on repacking 

892 if self._packed_refs is None: 

893 # set both to empty because we want _peeled_refs to be 

894 # None if and only if _packed_refs is also None. 

895 self._packed_refs = {} 

896 self._peeled_refs = {} 

897 path = os.path.join(self.path, b"packed-refs") 

898 try: 

899 f = GitFile(path, "rb") 

900 except FileNotFoundError: 

901 return {} 

902 with f: 

903 first_line = next(iter(f)).rstrip() 

904 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

905 for sha, name, peeled in read_packed_refs_with_peeled(f): 

906 self._packed_refs[name] = sha 

907 if peeled: 

908 self._peeled_refs[name] = peeled 

909 else: 

910 f.seek(0) 

911 for sha, name in read_packed_refs(f): 

912 self._packed_refs[name] = sha 

913 return self._packed_refs 

914 

915 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

916 """Add the given refs as packed refs. 

917 

918 Args: 

919 new_refs: A mapping of ref names to targets; if a target is None that 

920 means remove the ref 

921 """ 

922 if not new_refs: 

923 return 

924 

925 path = os.path.join(self.path, b"packed-refs") 

926 

927 with GitFile(path, "wb") as f: 

928 # reread cached refs from disk, while holding the lock 

929 packed_refs = self.get_packed_refs().copy() 

930 

931 for ref, target in new_refs.items(): 

932 # sanity check 

933 if ref == HEADREF: 

934 raise ValueError("cannot pack HEAD") 

935 

936 # remove any loose refs pointing to this one -- please 

937 # note that this bypasses remove_if_equals as we don't 

938 # want to affect packed refs in here 

939 with suppress(OSError): 

940 os.remove(self.refpath(ref)) 

941 

942 if target is not None: 

943 packed_refs[ref] = target 

944 else: 

945 packed_refs.pop(ref, None) 

946 

947 write_packed_refs(f, packed_refs, self._peeled_refs) 

948 

949 self._packed_refs = packed_refs 

950 

951 def get_peeled(self, name: bytes) -> Optional[bytes]: 

952 """Return the cached peeled value of a ref, if available. 

953 

954 Args: 

955 name: Name of the ref to peel 

956 Returns: The peeled value of the ref. If the ref is known not point to 

957 a tag, this will be the SHA the ref refers to. If the ref may point 

958 to a tag, but no cached information is available, None is returned. 

959 """ 

960 self.get_packed_refs() 

961 if ( 

962 self._peeled_refs is None 

963 or self._packed_refs is None 

964 or name not in self._packed_refs 

965 ): 

966 # No cache: no peeled refs were read, or this ref is loose 

967 return None 

968 if name in self._peeled_refs: 

969 return self._peeled_refs[name] 

970 else: 

971 # Known not peelable 

972 return self[name] 

973 

974 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

975 """Read a reference file and return its contents. 

976 

977 If the reference file a symbolic reference, only read the first line of 

978 the file. Otherwise, only read the first 40 bytes. 

979 

980 Args: 

981 name: the refname to read, relative to refpath 

982 Returns: The contents of the ref file, or None if the file does not 

983 exist. 

984 

985 Raises: 

986 IOError: if any other error occurs 

987 """ 

988 filename = self.refpath(name) 

989 try: 

990 with GitFile(filename, "rb") as f: 

991 header = f.read(len(SYMREF)) 

992 if header == SYMREF: 

993 # Read only the first line 

994 return header + next(iter(f)).rstrip(b"\r\n") 

995 else: 

996 # Read only the first 40 bytes 

997 return header + f.read(40 - len(SYMREF)) 

998 except (OSError, UnicodeError): 

999 # don't assume anything specific about the error; in 

1000 # particular, invalid or forbidden paths can raise weird 

1001 # errors depending on the specific operating system 

1002 return None 

1003 

1004 def _remove_packed_ref(self, name: bytes) -> None: 

1005 if self._packed_refs is None: 

1006 return 

1007 filename = os.path.join(self.path, b"packed-refs") 

1008 # reread cached refs from disk, while holding the lock 

1009 f = GitFile(filename, "wb") 

1010 try: 

1011 self._packed_refs = None 

1012 self.get_packed_refs() 

1013 

1014 if self._packed_refs is None or name not in self._packed_refs: 

1015 f.abort() 

1016 return 

1017 

1018 del self._packed_refs[name] 

1019 if self._peeled_refs is not None: 

1020 with suppress(KeyError): 

1021 del self._peeled_refs[name] 

1022 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1023 f.close() 

1024 except BaseException: 

1025 f.abort() 

1026 raise 

1027 

1028 def set_symbolic_ref( 

1029 self, 

1030 name: bytes, 

1031 other: bytes, 

1032 committer: Optional[bytes] = None, 

1033 timestamp: Optional[int] = None, 

1034 timezone: Optional[int] = None, 

1035 message: Optional[bytes] = None, 

1036 ) -> None: 

1037 """Make a ref point at another ref. 

1038 

1039 Args: 

1040 name: Name of the ref to set 

1041 other: Name of the ref to point at 

1042 committer: Optional committer name 

1043 timestamp: Optional timestamp 

1044 timezone: Optional timezone 

1045 message: Optional message to describe the change 

1046 """ 

1047 self._check_refname(name) 

1048 self._check_refname(other) 

1049 filename = self.refpath(name) 

1050 f = GitFile(filename, "wb") 

1051 try: 

1052 f.write(SYMREF + other + b"\n") 

1053 sha = self.follow(name)[-1] 

1054 self._log( 

1055 name, 

1056 sha, 

1057 sha, 

1058 committer=committer, 

1059 timestamp=timestamp, 

1060 timezone=timezone, 

1061 message=message, 

1062 ) 

1063 except BaseException: 

1064 f.abort() 

1065 raise 

1066 else: 

1067 f.close() 

1068 

1069 def set_if_equals( 

1070 self, 

1071 name: bytes, 

1072 old_ref: Optional[bytes], 

1073 new_ref: bytes, 

1074 committer: Optional[bytes] = None, 

1075 timestamp: Optional[int] = None, 

1076 timezone: Optional[int] = None, 

1077 message: Optional[bytes] = None, 

1078 ) -> bool: 

1079 """Set a refname to new_ref only if it currently equals old_ref. 

1080 

1081 This method follows all symbolic references, and can be used to perform 

1082 an atomic compare-and-swap operation. 

1083 

1084 Args: 

1085 name: The refname to set. 

1086 old_ref: The old sha the refname must refer to, or None to set 

1087 unconditionally. 

1088 new_ref: The new sha the refname will refer to. 

1089 committer: Optional committer name 

1090 timestamp: Optional timestamp 

1091 timezone: Optional timezone 

1092 message: Set message for reflog 

1093 Returns: True if the set was successful, False otherwise. 

1094 """ 

1095 self._check_refname(name) 

1096 try: 

1097 realnames, _ = self.follow(name) 

1098 realname = realnames[-1] 

1099 except (KeyError, IndexError, SymrefLoop): 

1100 realname = name 

1101 filename = self.refpath(realname) 

1102 

1103 # make sure none of the ancestor folders is in packed refs 

1104 probe_ref = os.path.dirname(realname) 

1105 packed_refs = self.get_packed_refs() 

1106 while probe_ref: 

1107 if packed_refs.get(probe_ref, None) is not None: 

1108 raise NotADirectoryError(filename) 

1109 probe_ref = os.path.dirname(probe_ref) 

1110 

1111 ensure_dir_exists(os.path.dirname(filename)) 

1112 with GitFile(filename, "wb") as f: 

1113 if old_ref is not None: 

1114 try: 

1115 # read again while holding the lock to handle race conditions 

1116 orig_ref = self.read_loose_ref(realname) 

1117 if orig_ref is None: 

1118 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1119 if orig_ref != old_ref: 

1120 f.abort() 

1121 return False 

1122 except OSError: 

1123 f.abort() 

1124 raise 

1125 

1126 # Check if ref already has the desired value while holding the lock 

1127 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1128 current_ref = self.read_loose_ref(realname) 

1129 if current_ref is None: 

1130 current_ref = packed_refs.get(realname, None) 

1131 

1132 if current_ref is not None and current_ref == new_ref: 

1133 # Ref already has desired value, abort write to avoid fsync 

1134 f.abort() 

1135 return True 

1136 

1137 try: 

1138 f.write(new_ref + b"\n") 

1139 except OSError: 

1140 f.abort() 

1141 raise 

1142 self._log( 

1143 realname, 

1144 old_ref, 

1145 new_ref, 

1146 committer=committer, 

1147 timestamp=timestamp, 

1148 timezone=timezone, 

1149 message=message, 

1150 ) 

1151 return True 

1152 

1153 def add_if_new( 

1154 self, 

1155 name: bytes, 

1156 ref: bytes, 

1157 committer: Optional[bytes] = None, 

1158 timestamp: Optional[int] = None, 

1159 timezone: Optional[int] = None, 

1160 message: Optional[bytes] = None, 

1161 ) -> bool: 

1162 """Add a new reference only if it does not already exist. 

1163 

1164 This method follows symrefs, and only ensures that the last ref in the 

1165 chain does not exist. 

1166 

1167 Args: 

1168 name: The refname to set. 

1169 ref: The new sha the refname will refer to. 

1170 committer: Optional committer name 

1171 timestamp: Optional timestamp 

1172 timezone: Optional timezone 

1173 message: Optional message for reflog 

1174 Returns: True if the add was successful, False otherwise. 

1175 """ 

1176 try: 

1177 realnames, contents = self.follow(name) 

1178 if contents is not None: 

1179 return False 

1180 realname = realnames[-1] 

1181 except (KeyError, IndexError): 

1182 realname = name 

1183 self._check_refname(realname) 

1184 filename = self.refpath(realname) 

1185 ensure_dir_exists(os.path.dirname(filename)) 

1186 with GitFile(filename, "wb") as f: 

1187 if os.path.exists(filename) or name in self.get_packed_refs(): 

1188 f.abort() 

1189 return False 

1190 try: 

1191 f.write(ref + b"\n") 

1192 except OSError: 

1193 f.abort() 

1194 raise 

1195 else: 

1196 self._log( 

1197 name, 

1198 None, 

1199 ref, 

1200 committer=committer, 

1201 timestamp=timestamp, 

1202 timezone=timezone, 

1203 message=message, 

1204 ) 

1205 return True 

1206 

1207 def remove_if_equals( 

1208 self, 

1209 name: bytes, 

1210 old_ref: Optional[bytes], 

1211 committer: Optional[bytes] = None, 

1212 timestamp: Optional[int] = None, 

1213 timezone: Optional[int] = None, 

1214 message: Optional[bytes] = None, 

1215 ) -> bool: 

1216 """Remove a refname only if it currently equals old_ref. 

1217 

1218 This method does not follow symbolic references. It can be used to 

1219 perform an atomic compare-and-delete operation. 

1220 

1221 Args: 

1222 name: The refname to delete. 

1223 old_ref: The old sha the refname must refer to, or None to 

1224 delete unconditionally. 

1225 committer: Optional committer name 

1226 timestamp: Optional timestamp 

1227 timezone: Optional timezone 

1228 message: Optional message 

1229 Returns: True if the delete was successful, False otherwise. 

1230 """ 

1231 self._check_refname(name) 

1232 filename = self.refpath(name) 

1233 ensure_dir_exists(os.path.dirname(filename)) 

1234 f = GitFile(filename, "wb") 

1235 try: 

1236 if old_ref is not None: 

1237 orig_ref = self.read_loose_ref(name) 

1238 if orig_ref is None: 

1239 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1240 if orig_ref != old_ref: 

1241 return False 

1242 

1243 # remove the reference file itself 

1244 try: 

1245 found = os.path.lexists(filename) 

1246 except OSError: 

1247 # may only be packed, or otherwise unstorable 

1248 found = False 

1249 

1250 if found: 

1251 os.remove(filename) 

1252 

1253 self._remove_packed_ref(name) 

1254 self._log( 

1255 name, 

1256 old_ref, 

1257 None, 

1258 committer=committer, 

1259 timestamp=timestamp, 

1260 timezone=timezone, 

1261 message=message, 

1262 ) 

1263 finally: 

1264 # never write, we just wanted the lock 

1265 f.abort() 

1266 

1267 # outside of the lock, clean-up any parent directory that might now 

1268 # be empty. this ensures that re-creating a reference of the same 

1269 # name of what was previously a directory works as expected 

1270 parent = name 

1271 while True: 

1272 try: 

1273 parent, _ = parent.rsplit(b"/", 1) 

1274 except ValueError: 

1275 break 

1276 

1277 if parent == b"refs": 

1278 break 

1279 parent_filename = self.refpath(parent) 

1280 try: 

1281 os.rmdir(parent_filename) 

1282 except OSError: 

1283 # this can be caused by the parent directory being 

1284 # removed by another process, being not empty, etc. 

1285 # in any case, this is non fatal because we already 

1286 # removed the reference, just ignore it 

1287 break 

1288 

1289 return True 

1290 

1291 def pack_refs(self, all: bool = False) -> None: 

1292 """Pack loose refs into packed-refs file. 

1293 

1294 Args: 

1295 all: If True, pack all refs. If False, only pack tags. 

1296 """ 

1297 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1298 for ref in self.allkeys(): 

1299 if ref == HEADREF: 

1300 # Never pack HEAD 

1301 continue 

1302 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1303 try: 

1304 sha = self[ref] 

1305 if sha: 

1306 refs_to_pack[ref] = sha 

1307 except KeyError: 

1308 # Broken ref, skip it 

1309 pass 

1310 

1311 if refs_to_pack: 

1312 self.add_packed_refs(refs_to_pack) 

1313 

1314 

1315def _split_ref_line(line: bytes) -> tuple[bytes, bytes]: 

1316 """Split a single ref line into a tuple of SHA1 and name.""" 

1317 fields = line.rstrip(b"\n\r").split(b" ") 

1318 if len(fields) != 2: 

1319 raise PackedRefsException(f"invalid ref line {line!r}") 

1320 sha, name = fields 

1321 if not valid_hexsha(sha): 

1322 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1323 if not check_ref_format(name): 

1324 raise PackedRefsException(f"invalid ref name {name!r}") 

1325 return (sha, name) 

1326 

1327 

1328def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]: 

1329 """Read a packed refs file. 

1330 

1331 Args: 

1332 f: file-like object to read from 

1333 Returns: Iterator over tuples with SHA1s and ref names. 

1334 """ 

1335 for line in f: 

1336 if line.startswith(b"#"): 

1337 # Comment 

1338 continue 

1339 if line.startswith(b"^"): 

1340 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1341 yield _split_ref_line(line) 

1342 

1343 

1344def read_packed_refs_with_peeled( 

1345 f: IO[bytes], 

1346) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]: 

1347 """Read a packed refs file including peeled refs. 

1348 

1349 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1350 with ref names, SHA1s, and peeled SHA1s (or None). 

1351 

1352 Args: 

1353 f: file-like object to read from, seek'ed to the second line 

1354 """ 

1355 last = None 

1356 for line in f: 

1357 if line[0] == b"#": 

1358 continue 

1359 line = line.rstrip(b"\r\n") 

1360 if line.startswith(b"^"): 

1361 if not last: 

1362 raise PackedRefsException("unexpected peeled ref line") 

1363 if not valid_hexsha(line[1:]): 

1364 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1365 sha, name = _split_ref_line(last) 

1366 last = None 

1367 yield (sha, name, line[1:]) 

1368 else: 

1369 if last: 

1370 sha, name = _split_ref_line(last) 

1371 yield (sha, name, None) 

1372 last = line 

1373 if last: 

1374 sha, name = _split_ref_line(last) 

1375 yield (sha, name, None) 

1376 

1377 

1378def write_packed_refs( 

1379 f: IO[bytes], 

1380 packed_refs: dict[bytes, bytes], 

1381 peeled_refs: Optional[dict[bytes, bytes]] = None, 

1382) -> None: 

1383 """Write a packed refs file. 

1384 

1385 Args: 

1386 f: empty file-like object to write to 

1387 packed_refs: dict of refname to sha of packed refs to write 

1388 peeled_refs: dict of refname to peeled value of sha 

1389 """ 

1390 if peeled_refs is None: 

1391 peeled_refs = {} 

1392 else: 

1393 f.write(b"# pack-refs with: peeled\n") 

1394 for refname in sorted(packed_refs.keys()): 

1395 f.write(git_line(packed_refs[refname], refname)) 

1396 if refname in peeled_refs: 

1397 f.write(b"^" + peeled_refs[refname] + b"\n") 

1398 

1399 

1400def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]: 

1401 """Read info/refs file. 

1402 

1403 Args: 

1404 f: File-like object to read from 

1405 

1406 Returns: 

1407 Dictionary mapping ref names to SHA1s 

1408 """ 

1409 ret = {} 

1410 for line in f.readlines(): 

1411 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1412 ret[name] = sha 

1413 return ret 

1414 

1415 

1416def write_info_refs( 

1417 refs: dict[bytes, bytes], store: ObjectContainer 

1418) -> Iterator[bytes]: 

1419 """Generate info refs.""" 

1420 # TODO: Avoid recursive import :( 

1421 from .object_store import peel_sha 

1422 

1423 for name, sha in sorted(refs.items()): 

1424 # get_refs() includes HEAD as a special case, but we don't want to 

1425 # advertise it 

1426 if name == HEADREF: 

1427 continue 

1428 try: 

1429 o = store[sha] 

1430 except KeyError: 

1431 continue 

1432 unpeeled, peeled = peel_sha(store, sha) 

1433 yield o.id + b"\t" + name + b"\n" 

1434 if o.id != peeled.id: 

1435 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1436 

1437 

1438def is_local_branch(x: bytes) -> bool: 

1439 """Check if a ref name is a local branch.""" 

1440 return x.startswith(LOCAL_BRANCH_PREFIX) 

1441 

1442 

1443T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]]) 

1444 

1445 

1446def strip_peeled_refs(refs: T) -> T: 

1447 """Remove all peeled refs.""" 

1448 return { 

1449 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1450 } 

1451 

1452 

1453def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]: 

1454 """Split peeled refs from regular refs.""" 

1455 peeled: dict[bytes, bytes] = {} 

1456 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)} 

1457 

1458 for ref, sha in refs.items(): 

1459 if ref.endswith(PEELED_TAG_SUFFIX): 

1460 # Only add to peeled dict if sha is not None 

1461 if sha is not None: 

1462 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1463 

1464 return regular, peeled 

1465 

1466 

1467def _set_origin_head( 

1468 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes] 

1469) -> None: 

1470 # set refs/remotes/origin/HEAD 

1471 origin_base = b"refs/remotes/" + origin + b"/" 

1472 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1473 origin_ref = origin_base + HEADREF 

1474 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1475 if target_ref in refs: 

1476 refs.set_symbolic_ref(origin_ref, target_ref) 

1477 

1478 

1479def _set_default_branch( 

1480 refs: RefsContainer, 

1481 origin: bytes, 

1482 origin_head: Optional[bytes], 

1483 branch: bytes, 

1484 ref_message: Optional[bytes], 

1485) -> bytes: 

1486 """Set the default branch.""" 

1487 origin_base = b"refs/remotes/" + origin + b"/" 

1488 if branch: 

1489 origin_ref = origin_base + branch 

1490 if origin_ref in refs: 

1491 local_ref = LOCAL_BRANCH_PREFIX + branch 

1492 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1493 head_ref = local_ref 

1494 elif LOCAL_TAG_PREFIX + branch in refs: 

1495 head_ref = LOCAL_TAG_PREFIX + branch 

1496 else: 

1497 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1498 elif origin_head: 

1499 head_ref = origin_head 

1500 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1501 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1502 else: 

1503 origin_ref = origin_head 

1504 try: 

1505 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1506 except KeyError: 

1507 pass 

1508 else: 

1509 raise ValueError("neither origin_head nor branch are provided") 

1510 return head_ref 

1511 

1512 

1513def _set_head( 

1514 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes] 

1515) -> Optional[bytes]: 

1516 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1517 # detach HEAD at specified tag 

1518 head = refs[head_ref] 

1519 if isinstance(head, Tag): 

1520 _cls, obj = head.object 

1521 head = obj.get_object(obj).id 

1522 del refs[HEADREF] 

1523 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1524 else: 

1525 # set HEAD to specific branch 

1526 try: 

1527 head = refs[head_ref] 

1528 refs.set_symbolic_ref(HEADREF, head_ref) 

1529 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1530 except KeyError: 

1531 head = None 

1532 return head 

1533 

1534 

1535def _import_remote_refs( 

1536 refs_container: RefsContainer, 

1537 remote_name: str, 

1538 refs: dict[bytes, Optional[bytes]], 

1539 message: Optional[bytes] = None, 

1540 prune: bool = False, 

1541 prune_tags: bool = False, 

1542) -> None: 

1543 stripped_refs = strip_peeled_refs(refs) 

1544 branches = { 

1545 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1546 for (n, v) in stripped_refs.items() 

1547 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None 

1548 } 

1549 refs_container.import_refs( 

1550 b"refs/remotes/" + remote_name.encode(), 

1551 branches, 

1552 message=message, 

1553 prune=prune, 

1554 ) 

1555 tags = { 

1556 n[len(LOCAL_TAG_PREFIX) :]: v 

1557 for (n, v) in stripped_refs.items() 

1558 if n.startswith(LOCAL_TAG_PREFIX) 

1559 and not n.endswith(PEELED_TAG_SUFFIX) 

1560 and v is not None 

1561 } 

1562 refs_container.import_refs( 

1563 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1564 ) 

1565 

1566 

1567def serialize_refs( 

1568 store: ObjectContainer, refs: dict[bytes, bytes] 

1569) -> dict[bytes, bytes]: 

1570 """Serialize refs with peeled refs. 

1571 

1572 Args: 

1573 store: Object store to peel refs from 

1574 refs: Dictionary of ref names to SHAs 

1575 

1576 Returns: 

1577 Dictionary with refs and peeled refs (marked with ^{}) 

1578 """ 

1579 # TODO: Avoid recursive import :( 

1580 from .object_store import peel_sha 

1581 

1582 ret = {} 

1583 for ref, sha in refs.items(): 

1584 try: 

1585 unpeeled, peeled = peel_sha(store, sha) 

1586 except KeyError: 

1587 warnings.warn( 

1588 "ref {} points at non-present sha {}".format( 

1589 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1590 ), 

1591 UserWarning, 

1592 ) 

1593 continue 

1594 else: 

1595 if isinstance(unpeeled, Tag): 

1596 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1597 ret[ref] = unpeeled.id 

1598 return ret 

1599 

1600 

1601class locked_ref: 

1602 """Lock a ref while making modifications. 

1603 

1604 Works as a context manager. 

1605 """ 

1606 

1607 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1608 """Initialize a locked ref. 

1609 

1610 Args: 

1611 refs_container: The DiskRefsContainer to lock the ref in 

1612 refname: The ref name to lock 

1613 """ 

1614 self._refs_container = refs_container 

1615 self._refname = refname 

1616 self._file: Optional[_GitFile] = None 

1617 self._realname: Optional[Ref] = None 

1618 self._deleted = False 

1619 

1620 def __enter__(self) -> "locked_ref": 

1621 """Enter the context manager and acquire the lock. 

1622 

1623 Returns: 

1624 This locked_ref instance 

1625 

1626 Raises: 

1627 OSError: If the lock cannot be acquired 

1628 """ 

1629 self._refs_container._check_refname(self._refname) 

1630 try: 

1631 realnames, _ = self._refs_container.follow(self._refname) 

1632 self._realname = realnames[-1] 

1633 except (KeyError, IndexError, SymrefLoop): 

1634 self._realname = self._refname 

1635 

1636 filename = self._refs_container.refpath(self._realname) 

1637 ensure_dir_exists(os.path.dirname(filename)) 

1638 f = GitFile(filename, "wb") 

1639 self._file = f 

1640 return self 

1641 

1642 def __exit__( 

1643 self, 

1644 exc_type: Optional[type], 

1645 exc_value: Optional[BaseException], 

1646 traceback: Optional[types.TracebackType], 

1647 ) -> None: 

1648 """Exit the context manager and release the lock. 

1649 

1650 Args: 

1651 exc_type: Type of exception if one occurred 

1652 exc_value: Exception instance if one occurred 

1653 traceback: Traceback if an exception occurred 

1654 """ 

1655 if self._file: 

1656 if exc_type is not None or self._deleted: 

1657 self._file.abort() 

1658 else: 

1659 self._file.close() 

1660 

1661 def get(self) -> Optional[bytes]: 

1662 """Get the current value of the ref.""" 

1663 if not self._file: 

1664 raise RuntimeError("locked_ref not in context") 

1665 

1666 assert self._realname is not None 

1667 current_ref = self._refs_container.read_loose_ref(self._realname) 

1668 if current_ref is None: 

1669 current_ref = self._refs_container.get_packed_refs().get( 

1670 self._realname, None 

1671 ) 

1672 return current_ref 

1673 

1674 def ensure_equals(self, expected_value: Optional[bytes]) -> bool: 

1675 """Ensure the ref currently equals the expected value. 

1676 

1677 Args: 

1678 expected_value: The expected current value of the ref 

1679 Returns: 

1680 True if the ref equals the expected value, False otherwise 

1681 """ 

1682 current_value = self.get() 

1683 return current_value == expected_value 

1684 

1685 def set(self, new_ref: bytes) -> None: 

1686 """Set the ref to a new value. 

1687 

1688 Args: 

1689 new_ref: The new SHA1 or symbolic ref value 

1690 """ 

1691 if not self._file: 

1692 raise RuntimeError("locked_ref not in context") 

1693 

1694 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1695 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1696 

1697 self._file.seek(0) 

1698 self._file.truncate() 

1699 self._file.write(new_ref + b"\n") 

1700 self._deleted = False 

1701 

1702 def set_symbolic_ref(self, target: Ref) -> None: 

1703 """Make this ref point at another ref. 

1704 

1705 Args: 

1706 target: Name of the ref to point at 

1707 """ 

1708 if not self._file: 

1709 raise RuntimeError("locked_ref not in context") 

1710 

1711 self._refs_container._check_refname(target) 

1712 self._file.seek(0) 

1713 self._file.truncate() 

1714 self._file.write(SYMREF + target + b"\n") 

1715 self._deleted = False 

1716 

1717 def delete(self) -> None: 

1718 """Delete the ref file while holding the lock.""" 

1719 if not self._file: 

1720 raise RuntimeError("locked_ref not in context") 

1721 

1722 # Delete the actual ref file while holding the lock 

1723 if self._realname: 

1724 filename = self._refs_container.refpath(self._realname) 

1725 try: 

1726 if os.path.lexists(filename): 

1727 os.remove(filename) 

1728 except FileNotFoundError: 

1729 pass 

1730 self._refs_container._remove_packed_ref(self._realname) 

1731 

1732 self._deleted = True 

1733 

1734 

1735def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

1736 """Filter refs to only include those with a given prefix. 

1737 

1738 Args: 

1739 refs: A dictionary of refs. 

1740 prefixes: The prefixes to filter by. 

1741 """ 

1742 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

1743 return cast(T, filtered)