Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

719 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import types 

27import warnings 

28from collections.abc import Iterable, Iterator, Mapping 

29from contextlib import suppress 

30from typing import ( 

31 IO, 

32 TYPE_CHECKING, 

33 Any, 

34 BinaryIO, 

35 Callable, 

36 Optional, 

37 TypeVar, 

38 Union, 

39) 

40 

41if TYPE_CHECKING: 

42 from .file import _GitFile 

43 

44from .errors import PackedRefsException, RefFormatError 

45from .file import GitFile, ensure_dir_exists 

46from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

47from .pack import ObjectContainer 

48 

49Ref = bytes 

50 

51HEADREF = b"HEAD" 

52SYMREF = b"ref: " 

53LOCAL_BRANCH_PREFIX = b"refs/heads/" 

54LOCAL_TAG_PREFIX = b"refs/tags/" 

55LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

56LOCAL_NOTES_PREFIX = b"refs/notes/" 

57BAD_REF_CHARS = set(b"\177 ~^:?*[") 

58PEELED_TAG_SUFFIX = b"^{}" 

59 

60# For backwards compatibility 

61ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

62 

63 

64class SymrefLoop(Exception): 

65 """There is a loop between one or more symrefs.""" 

66 

67 def __init__(self, ref: bytes, depth: int) -> None: 

68 """Initialize SymrefLoop exception.""" 

69 self.ref = ref 

70 self.depth = depth 

71 

72 

73def parse_symref_value(contents: bytes) -> bytes: 

74 """Parse a symref value. 

75 

76 Args: 

77 contents: Contents to parse 

78 Returns: Destination 

79 """ 

80 if contents.startswith(SYMREF): 

81 return contents[len(SYMREF) :].rstrip(b"\r\n") 

82 raise ValueError(contents) 

83 

84 

85def check_ref_format(refname: Ref) -> bool: 

86 """Check if a refname is correctly formatted. 

87 

88 Implements all the same rules as git-check-ref-format[1]. 

89 

90 [1] 

91 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

92 

93 Args: 

94 refname: The refname to check 

95 Returns: True if refname is valid, False otherwise 

96 """ 

97 # These could be combined into one big expression, but are listed 

98 # separately to parallel [1]. 

99 if b"/." in refname or refname.startswith(b"."): 

100 return False 

101 if b"/" not in refname: 

102 return False 

103 if b".." in refname: 

104 return False 

105 for i, c in enumerate(refname): 

106 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

107 return False 

108 if refname[-1] in b"/.": 

109 return False 

110 if refname.endswith(b".lock"): 

111 return False 

112 if b"@{" in refname: 

113 return False 

114 if b"\\" in refname: 

115 return False 

116 return True 

117 

118 

119def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

120 """Parse a remote ref into remote name and branch name. 

121 

122 Args: 

123 ref: Remote ref like b"refs/remotes/origin/main" 

124 

125 Returns: 

126 Tuple of (remote_name, branch_name) 

127 

128 Raises: 

129 ValueError: If ref is not a valid remote ref 

130 """ 

131 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

132 raise ValueError(f"Not a remote ref: {ref!r}") 

133 

134 # Remove the prefix 

135 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

136 

137 # Split into remote name and branch name 

138 parts = remainder.split(b"/", 1) 

139 if len(parts) != 2: 

140 raise ValueError(f"Invalid remote ref format: {ref!r}") 

141 

142 remote_name, branch_name = parts 

143 return (remote_name, branch_name) 

144 

145 

146class RefsContainer: 

147 """A container for refs.""" 

148 

149 def __init__( 

150 self, 

151 logger: Optional[ 

152 Callable[ 

153 [ 

154 bytes, 

155 bytes, 

156 bytes, 

157 Optional[bytes], 

158 Optional[int], 

159 Optional[int], 

160 bytes, 

161 ], 

162 None, 

163 ] 

164 ] = None, 

165 ) -> None: 

166 """Initialize RefsContainer with optional logger function.""" 

167 self._logger = logger 

168 

169 def _log( 

170 self, 

171 ref: bytes, 

172 old_sha: Optional[bytes], 

173 new_sha: Optional[bytes], 

174 committer: Optional[bytes] = None, 

175 timestamp: Optional[int] = None, 

176 timezone: Optional[int] = None, 

177 message: Optional[bytes] = None, 

178 ) -> None: 

179 if self._logger is None: 

180 return 

181 if message is None: 

182 return 

183 # Use ZERO_SHA for None values, matching git behavior 

184 if old_sha is None: 

185 old_sha = ZERO_SHA 

186 if new_sha is None: 

187 new_sha = ZERO_SHA 

188 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

189 

190 def set_symbolic_ref( 

191 self, 

192 name: bytes, 

193 other: bytes, 

194 committer: Optional[bytes] = None, 

195 timestamp: Optional[int] = None, 

196 timezone: Optional[int] = None, 

197 message: Optional[bytes] = None, 

198 ) -> None: 

199 """Make a ref point at another ref. 

200 

201 Args: 

202 name: Name of the ref to set 

203 other: Name of the ref to point at 

204 committer: Optional committer name/email 

205 timestamp: Optional timestamp 

206 timezone: Optional timezone 

207 message: Optional message 

208 """ 

209 raise NotImplementedError(self.set_symbolic_ref) 

210 

211 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

212 """Get contents of the packed-refs file. 

213 

214 Returns: Dictionary mapping ref names to SHA1s 

215 

216 Note: Will return an empty dictionary when no packed-refs file is 

217 present. 

218 """ 

219 raise NotImplementedError(self.get_packed_refs) 

220 

221 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None: 

222 """Add the given refs as packed refs. 

223 

224 Args: 

225 new_refs: A mapping of ref names to targets; if a target is None that 

226 means remove the ref 

227 """ 

228 raise NotImplementedError(self.add_packed_refs) 

229 

230 def get_peeled(self, name: bytes) -> Optional[ObjectID]: 

231 """Return the cached peeled value of a ref, if available. 

232 

233 Args: 

234 name: Name of the ref to peel 

235 Returns: The peeled value of the ref. If the ref is known not point to 

236 a tag, this will be the SHA the ref refers to. If the ref may point 

237 to a tag, but no cached information is available, None is returned. 

238 """ 

239 return None 

240 

241 def import_refs( 

242 self, 

243 base: Ref, 

244 other: Mapping[Ref, ObjectID], 

245 committer: Optional[bytes] = None, 

246 timestamp: Optional[bytes] = None, 

247 timezone: Optional[bytes] = None, 

248 message: Optional[bytes] = None, 

249 prune: bool = False, 

250 ) -> None: 

251 """Import refs from another repository. 

252 

253 Args: 

254 base: Base ref to import into (e.g., b'refs/remotes/origin') 

255 other: Dictionary of refs to import 

256 committer: Optional committer for reflog 

257 timestamp: Optional timestamp for reflog 

258 timezone: Optional timezone for reflog 

259 message: Optional message for reflog 

260 prune: If True, remove refs not in other 

261 """ 

262 if prune: 

263 to_delete = set(self.subkeys(base)) 

264 else: 

265 to_delete = set() 

266 for name, value in other.items(): 

267 if value is None: 

268 to_delete.add(name) 

269 else: 

270 self.set_if_equals( 

271 b"/".join((base, name)), None, value, message=message 

272 ) 

273 if to_delete: 

274 try: 

275 to_delete.remove(name) 

276 except KeyError: 

277 pass 

278 for ref in to_delete: 

279 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

280 

281 def allkeys(self) -> set[Ref]: 

282 """All refs present in this container.""" 

283 raise NotImplementedError(self.allkeys) 

284 

285 def __iter__(self) -> Iterator[Ref]: 

286 """Iterate over all reference keys.""" 

287 return iter(self.allkeys()) 

288 

289 def keys(self, base: Optional[bytes] = None) -> set[bytes]: 

290 """Refs present in this container. 

291 

292 Args: 

293 base: An optional base to return refs under. 

294 Returns: An unsorted set of valid refs in this container, including 

295 packed refs. 

296 """ 

297 if base is not None: 

298 return self.subkeys(base) 

299 else: 

300 return self.allkeys() 

301 

302 def subkeys(self, base: bytes) -> set[bytes]: 

303 """Refs present in this container under a base. 

304 

305 Args: 

306 base: The base to return refs under. 

307 Returns: A set of valid refs in this container under the base; the base 

308 prefix is stripped from the ref names returned. 

309 """ 

310 keys = set() 

311 base_len = len(base) + 1 

312 for refname in self.allkeys(): 

313 if refname.startswith(base): 

314 keys.add(refname[base_len:]) 

315 return keys 

316 

317 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]: 

318 """Return the contents of this container as a dictionary.""" 

319 ret = {} 

320 keys = self.keys(base) 

321 if base is None: 

322 base = b"" 

323 else: 

324 base = base.rstrip(b"/") 

325 for key in keys: 

326 try: 

327 ret[key] = self[(base + b"/" + key).strip(b"/")] 

328 except (SymrefLoop, KeyError): 

329 continue # Unable to resolve 

330 

331 return ret 

332 

333 def _check_refname(self, name: bytes) -> None: 

334 """Ensure a refname is valid and lives in refs or is HEAD. 

335 

336 HEAD is not a valid refname according to git-check-ref-format, but this 

337 class needs to be able to touch HEAD. Also, check_ref_format expects 

338 refnames without the leading 'refs/', but this class requires that 

339 so it cannot touch anything outside the refs dir (or HEAD). 

340 

341 Args: 

342 name: The name of the reference. 

343 

344 Raises: 

345 KeyError: if a refname is not HEAD or is otherwise not valid. 

346 """ 

347 if name in (HEADREF, b"refs/stash"): 

348 return 

349 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

350 raise RefFormatError(name) 

351 

352 def read_ref(self, refname: bytes) -> Optional[bytes]: 

353 """Read a reference without following any references. 

354 

355 Args: 

356 refname: The name of the reference 

357 Returns: The contents of the ref file, or None if it does 

358 not exist. 

359 """ 

360 contents = self.read_loose_ref(refname) 

361 if not contents: 

362 contents = self.get_packed_refs().get(refname, None) 

363 return contents 

364 

365 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

366 """Read a loose reference and return its contents. 

367 

368 Args: 

369 name: the refname to read 

370 Returns: The contents of the ref file, or None if it does 

371 not exist. 

372 """ 

373 raise NotImplementedError(self.read_loose_ref) 

374 

375 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]: 

376 """Follow a reference name. 

377 

378 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

379 references in the chain 

380 """ 

381 contents: Optional[bytes] = SYMREF + name 

382 depth = 0 

383 refnames = [] 

384 while contents and contents.startswith(SYMREF): 

385 refname = contents[len(SYMREF) :] 

386 refnames.append(refname) 

387 contents = self.read_ref(refname) 

388 if not contents: 

389 break 

390 depth += 1 

391 if depth > 5: 

392 raise SymrefLoop(name, depth) 

393 return refnames, contents 

394 

395 def __contains__(self, refname: bytes) -> bool: 

396 """Check if a reference exists.""" 

397 if self.read_ref(refname): 

398 return True 

399 return False 

400 

401 def __getitem__(self, name: bytes) -> ObjectID: 

402 """Get the SHA1 for a reference name. 

403 

404 This method follows all symbolic references. 

405 """ 

406 _, sha = self.follow(name) 

407 if sha is None: 

408 raise KeyError(name) 

409 return sha 

410 

411 def set_if_equals( 

412 self, 

413 name: bytes, 

414 old_ref: Optional[bytes], 

415 new_ref: bytes, 

416 committer: Optional[bytes] = None, 

417 timestamp: Optional[int] = None, 

418 timezone: Optional[int] = None, 

419 message: Optional[bytes] = None, 

420 ) -> bool: 

421 """Set a refname to new_ref only if it currently equals old_ref. 

422 

423 This method follows all symbolic references if applicable for the 

424 subclass, and can be used to perform an atomic compare-and-swap 

425 operation. 

426 

427 Args: 

428 name: The refname to set. 

429 old_ref: The old sha the refname must refer to, or None to set 

430 unconditionally. 

431 new_ref: The new sha the refname will refer to. 

432 committer: Optional committer name/email 

433 timestamp: Optional timestamp 

434 timezone: Optional timezone 

435 message: Message for reflog 

436 Returns: True if the set was successful, False otherwise. 

437 """ 

438 raise NotImplementedError(self.set_if_equals) 

439 

440 def add_if_new( 

441 self, 

442 name: bytes, 

443 ref: bytes, 

444 committer: Optional[bytes] = None, 

445 timestamp: Optional[int] = None, 

446 timezone: Optional[int] = None, 

447 message: Optional[bytes] = None, 

448 ) -> bool: 

449 """Add a new reference only if it does not already exist. 

450 

451 Args: 

452 name: Ref name 

453 ref: Ref value 

454 committer: Optional committer name/email 

455 timestamp: Optional timestamp 

456 timezone: Optional timezone 

457 message: Optional message for reflog 

458 """ 

459 raise NotImplementedError(self.add_if_new) 

460 

461 def __setitem__(self, name: bytes, ref: bytes) -> None: 

462 """Set a reference name to point to the given SHA1. 

463 

464 This method follows all symbolic references if applicable for the 

465 subclass. 

466 

467 Note: This method unconditionally overwrites the contents of a 

468 reference. To update atomically only if the reference has not 

469 changed, use set_if_equals(). 

470 

471 Args: 

472 name: The refname to set. 

473 ref: The new sha the refname will refer to. 

474 """ 

475 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

476 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

477 self.set_if_equals(name, None, ref) 

478 

479 def remove_if_equals( 

480 self, 

481 name: bytes, 

482 old_ref: Optional[bytes], 

483 committer: Optional[bytes] = None, 

484 timestamp: Optional[int] = None, 

485 timezone: Optional[int] = None, 

486 message: Optional[bytes] = None, 

487 ) -> bool: 

488 """Remove a refname only if it currently equals old_ref. 

489 

490 This method does not follow symbolic references, even if applicable for 

491 the subclass. It can be used to perform an atomic compare-and-delete 

492 operation. 

493 

494 Args: 

495 name: The refname to delete. 

496 old_ref: The old sha the refname must refer to, or None to 

497 delete unconditionally. 

498 committer: Optional committer name/email 

499 timestamp: Optional timestamp 

500 timezone: Optional timezone 

501 message: Message for reflog 

502 Returns: True if the delete was successful, False otherwise. 

503 """ 

504 raise NotImplementedError(self.remove_if_equals) 

505 

506 def __delitem__(self, name: bytes) -> None: 

507 """Remove a refname. 

508 

509 This method does not follow symbolic references, even if applicable for 

510 the subclass. 

511 

512 Note: This method unconditionally deletes the contents of a reference. 

513 To delete atomically only if the reference has not changed, use 

514 remove_if_equals(). 

515 

516 Args: 

517 name: The refname to delete. 

518 """ 

519 self.remove_if_equals(name, None) 

520 

521 def get_symrefs(self) -> dict[bytes, bytes]: 

522 """Get a dict with all symrefs in this container. 

523 

524 Returns: Dictionary mapping source ref to target ref 

525 """ 

526 ret = {} 

527 for src in self.allkeys(): 

528 try: 

529 ref_value = self.read_ref(src) 

530 assert ref_value is not None 

531 dst = parse_symref_value(ref_value) 

532 except ValueError: 

533 pass 

534 else: 

535 ret[src] = dst 

536 return ret 

537 

538 def pack_refs(self, all: bool = False) -> None: 

539 """Pack loose refs into packed-refs file. 

540 

541 Args: 

542 all: If True, pack all refs. If False, only pack tags. 

543 """ 

544 raise NotImplementedError(self.pack_refs) 

545 

546 

547class DictRefsContainer(RefsContainer): 

548 """RefsContainer backed by a simple dict. 

549 

550 This container does not support symbolic or packed references and is not 

551 threadsafe. 

552 """ 

553 

554 def __init__( 

555 self, 

556 refs: dict[bytes, bytes], 

557 logger: Optional[ 

558 Callable[ 

559 [ 

560 bytes, 

561 Optional[bytes], 

562 Optional[bytes], 

563 Optional[bytes], 

564 Optional[int], 

565 Optional[int], 

566 Optional[bytes], 

567 ], 

568 None, 

569 ] 

570 ] = None, 

571 ) -> None: 

572 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

573 super().__init__(logger=logger) 

574 self._refs = refs 

575 self._peeled: dict[bytes, ObjectID] = {} 

576 self._watchers: set[Any] = set() 

577 

578 def allkeys(self) -> set[bytes]: 

579 """Return all reference keys.""" 

580 return set(self._refs.keys()) 

581 

582 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

583 """Read a loose reference.""" 

584 return self._refs.get(name, None) 

585 

586 def get_packed_refs(self) -> dict[bytes, bytes]: 

587 """Get packed references.""" 

588 return {} 

589 

590 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None: 

591 for watcher in self._watchers: 

592 watcher._notify((ref, newsha)) 

593 

594 def set_symbolic_ref( 

595 self, 

596 name: Ref, 

597 other: Ref, 

598 committer: Optional[bytes] = None, 

599 timestamp: Optional[int] = None, 

600 timezone: Optional[int] = None, 

601 message: Optional[bytes] = None, 

602 ) -> None: 

603 """Make a ref point at another ref. 

604 

605 Args: 

606 name: Name of the ref to set 

607 other: Name of the ref to point at 

608 committer: Optional committer name for reflog 

609 timestamp: Optional timestamp for reflog 

610 timezone: Optional timezone for reflog 

611 message: Optional message for reflog 

612 """ 

613 old = self.follow(name)[-1] 

614 new = SYMREF + other 

615 self._refs[name] = new 

616 self._notify(name, new) 

617 self._log( 

618 name, 

619 old, 

620 new, 

621 committer=committer, 

622 timestamp=timestamp, 

623 timezone=timezone, 

624 message=message, 

625 ) 

626 

627 def set_if_equals( 

628 self, 

629 name: bytes, 

630 old_ref: Optional[bytes], 

631 new_ref: bytes, 

632 committer: Optional[bytes] = None, 

633 timestamp: Optional[int] = None, 

634 timezone: Optional[int] = None, 

635 message: Optional[bytes] = None, 

636 ) -> bool: 

637 """Set a refname to new_ref only if it currently equals old_ref. 

638 

639 This method follows all symbolic references, and can be used to perform 

640 an atomic compare-and-swap operation. 

641 

642 Args: 

643 name: The refname to set. 

644 old_ref: The old sha the refname must refer to, or None to set 

645 unconditionally. 

646 new_ref: The new sha the refname will refer to. 

647 committer: Optional committer name for reflog 

648 timestamp: Optional timestamp for reflog 

649 timezone: Optional timezone for reflog 

650 message: Optional message for reflog 

651 

652 Returns: 

653 True if the set was successful, False otherwise. 

654 """ 

655 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

656 return False 

657 # Only update the specific ref requested, not the whole chain 

658 self._check_refname(name) 

659 old = self._refs.get(name) 

660 self._refs[name] = new_ref 

661 self._notify(name, new_ref) 

662 self._log( 

663 name, 

664 old, 

665 new_ref, 

666 committer=committer, 

667 timestamp=timestamp, 

668 timezone=timezone, 

669 message=message, 

670 ) 

671 return True 

672 

673 def add_if_new( 

674 self, 

675 name: Ref, 

676 ref: ObjectID, 

677 committer: Optional[bytes] = None, 

678 timestamp: Optional[int] = None, 

679 timezone: Optional[int] = None, 

680 message: Optional[bytes] = None, 

681 ) -> bool: 

682 """Add a new reference only if it does not already exist. 

683 

684 Args: 

685 name: Ref name 

686 ref: Ref value 

687 committer: Optional committer name for reflog 

688 timestamp: Optional timestamp for reflog 

689 timezone: Optional timezone for reflog 

690 message: Optional message for reflog 

691 

692 Returns: 

693 True if the add was successful, False otherwise. 

694 """ 

695 if name in self._refs: 

696 return False 

697 self._refs[name] = ref 

698 self._notify(name, ref) 

699 self._log( 

700 name, 

701 None, 

702 ref, 

703 committer=committer, 

704 timestamp=timestamp, 

705 timezone=timezone, 

706 message=message, 

707 ) 

708 return True 

709 

710 def remove_if_equals( 

711 self, 

712 name: bytes, 

713 old_ref: Optional[bytes], 

714 committer: Optional[bytes] = None, 

715 timestamp: Optional[int] = None, 

716 timezone: Optional[int] = None, 

717 message: Optional[bytes] = None, 

718 ) -> bool: 

719 """Remove a refname only if it currently equals old_ref. 

720 

721 This method does not follow symbolic references. It can be used to 

722 perform an atomic compare-and-delete operation. 

723 

724 Args: 

725 name: The refname to delete. 

726 old_ref: The old sha the refname must refer to, or None to 

727 delete unconditionally. 

728 committer: Optional committer name for reflog 

729 timestamp: Optional timestamp for reflog 

730 timezone: Optional timezone for reflog 

731 message: Optional message for reflog 

732 

733 Returns: 

734 True if the delete was successful, False otherwise. 

735 """ 

736 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

737 return False 

738 try: 

739 old = self._refs.pop(name) 

740 except KeyError: 

741 pass 

742 else: 

743 self._notify(name, None) 

744 self._log( 

745 name, 

746 old, 

747 None, 

748 committer=committer, 

749 timestamp=timestamp, 

750 timezone=timezone, 

751 message=message, 

752 ) 

753 return True 

754 

755 def get_peeled(self, name: bytes) -> Optional[bytes]: 

756 """Get peeled version of a reference.""" 

757 return self._peeled.get(name) 

758 

759 def _update(self, refs: Mapping[bytes, bytes]) -> None: 

760 """Update multiple refs; intended only for testing.""" 

761 # TODO(dborowitz): replace this with a public function that uses 

762 # set_if_equal. 

763 for ref, sha in refs.items(): 

764 self.set_if_equals(ref, None, sha) 

765 

766 def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None: 

767 """Update cached peeled refs; intended only for testing.""" 

768 self._peeled.update(peeled) 

769 

770 

771class InfoRefsContainer(RefsContainer): 

772 """Refs container that reads refs from a info/refs file.""" 

773 

774 def __init__(self, f: BinaryIO) -> None: 

775 """Initialize InfoRefsContainer from info/refs file.""" 

776 self._refs: dict[bytes, bytes] = {} 

777 self._peeled: dict[bytes, bytes] = {} 

778 refs = read_info_refs(f) 

779 (self._refs, self._peeled) = split_peeled_refs(refs) 

780 

781 def allkeys(self) -> set[bytes]: 

782 """Return all reference keys.""" 

783 return set(self._refs.keys()) 

784 

785 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

786 """Read a loose reference.""" 

787 return self._refs.get(name, None) 

788 

789 def get_packed_refs(self) -> dict[bytes, bytes]: 

790 """Get packed references.""" 

791 return {} 

792 

793 def get_peeled(self, name: bytes) -> Optional[bytes]: 

794 """Get peeled version of a reference.""" 

795 try: 

796 return self._peeled[name] 

797 except KeyError: 

798 return self._refs[name] 

799 

800 

801class DiskRefsContainer(RefsContainer): 

802 """Refs container that reads refs from disk.""" 

803 

804 def __init__( 

805 self, 

806 path: Union[str, bytes, os.PathLike[str]], 

807 worktree_path: Optional[Union[str, bytes, os.PathLike[str]]] = None, 

808 logger: Optional[ 

809 Callable[ 

810 [ 

811 bytes, 

812 bytes, 

813 bytes, 

814 Optional[bytes], 

815 Optional[int], 

816 Optional[int], 

817 bytes, 

818 ], 

819 None, 

820 ] 

821 ] = None, 

822 ) -> None: 

823 """Initialize DiskRefsContainer.""" 

824 super().__init__(logger=logger) 

825 # Convert path-like objects to strings, then to bytes for Git compatibility 

826 self.path = os.fsencode(os.fspath(path)) 

827 if worktree_path is None: 

828 self.worktree_path = self.path 

829 else: 

830 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

831 self._packed_refs: Optional[dict[bytes, bytes]] = None 

832 self._peeled_refs: Optional[dict[bytes, bytes]] = None 

833 

834 def __repr__(self) -> str: 

835 """Return string representation of DiskRefsContainer.""" 

836 return f"{self.__class__.__name__}({self.path!r})" 

837 

838 def _iter_dir( 

839 self, 

840 path: bytes, 

841 base: bytes, 

842 dir_filter: Optional[Callable[[bytes], bool]] = None, 

843 ) -> Iterator[bytes]: 

844 refspath = os.path.join(path, base.rstrip(b"/")) 

845 prefix_len = len(os.path.join(path, b"")) 

846 

847 for root, dirs, files in os.walk(refspath): 

848 directory = root[prefix_len:] 

849 if os.path.sep != "/": 

850 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

851 if dir_filter is not None: 

852 dirs[:] = [ 

853 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

854 ] 

855 

856 for filename in files: 

857 refname = b"/".join([directory, filename]) 

858 if check_ref_format(refname): 

859 yield refname 

860 

861 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]: 

862 base = base.rstrip(b"/") + b"/" 

863 search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = [] 

864 if base != b"refs/": 

865 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

866 search_paths.append((path, None)) 

867 elif self.worktree_path == self.path: 

868 # Iterate through all the refs from the main worktree 

869 search_paths.append((self.path, None)) 

870 else: 

871 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

872 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

873 # Iterate through all the per-worktree refs from the worktree's gitdir 

874 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

875 

876 for path, dir_filter in search_paths: 

877 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

878 

879 def subkeys(self, base: bytes) -> set[bytes]: 

880 """Return subkeys under a given base reference path.""" 

881 subkeys = set() 

882 

883 for key in self._iter_loose_refs(base): 

884 if key.startswith(base): 

885 subkeys.add(key[len(base) :].strip(b"/")) 

886 

887 for key in self.get_packed_refs(): 

888 if key.startswith(base): 

889 subkeys.add(key[len(base) :].strip(b"/")) 

890 return subkeys 

891 

892 def allkeys(self) -> set[bytes]: 

893 """Return all reference keys.""" 

894 allkeys = set() 

895 if os.path.exists(self.refpath(HEADREF)): 

896 allkeys.add(HEADREF) 

897 

898 allkeys.update(self._iter_loose_refs()) 

899 allkeys.update(self.get_packed_refs()) 

900 return allkeys 

901 

902 def refpath(self, name: bytes) -> bytes: 

903 """Return the disk path of a ref.""" 

904 path = name 

905 if os.path.sep != "/": 

906 path = path.replace(b"/", os.fsencode(os.path.sep)) 

907 

908 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

909 return os.path.join(root_dir, path) 

910 

911 def get_packed_refs(self) -> dict[bytes, bytes]: 

912 """Get contents of the packed-refs file. 

913 

914 Returns: Dictionary mapping ref names to SHA1s 

915 

916 Note: Will return an empty dictionary when no packed-refs file is 

917 present. 

918 """ 

919 # TODO: invalidate the cache on repacking 

920 if self._packed_refs is None: 

921 # set both to empty because we want _peeled_refs to be 

922 # None if and only if _packed_refs is also None. 

923 self._packed_refs = {} 

924 self._peeled_refs = {} 

925 path = os.path.join(self.path, b"packed-refs") 

926 try: 

927 f = GitFile(path, "rb") 

928 except FileNotFoundError: 

929 return {} 

930 with f: 

931 first_line = next(iter(f)).rstrip() 

932 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

933 for sha, name, peeled in read_packed_refs_with_peeled(f): 

934 self._packed_refs[name] = sha 

935 if peeled: 

936 self._peeled_refs[name] = peeled 

937 else: 

938 f.seek(0) 

939 for sha, name in read_packed_refs(f): 

940 self._packed_refs[name] = sha 

941 return self._packed_refs 

942 

943 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None: 

944 """Add the given refs as packed refs. 

945 

946 Args: 

947 new_refs: A mapping of ref names to targets; if a target is None that 

948 means remove the ref 

949 """ 

950 if not new_refs: 

951 return 

952 

953 path = os.path.join(self.path, b"packed-refs") 

954 

955 with GitFile(path, "wb") as f: 

956 # reread cached refs from disk, while holding the lock 

957 packed_refs = self.get_packed_refs().copy() 

958 

959 for ref, target in new_refs.items(): 

960 # sanity check 

961 if ref == HEADREF: 

962 raise ValueError("cannot pack HEAD") 

963 

964 # remove any loose refs pointing to this one -- please 

965 # note that this bypasses remove_if_equals as we don't 

966 # want to affect packed refs in here 

967 with suppress(OSError): 

968 os.remove(self.refpath(ref)) 

969 

970 if target is not None: 

971 packed_refs[ref] = target 

972 else: 

973 packed_refs.pop(ref, None) 

974 

975 write_packed_refs(f, packed_refs, self._peeled_refs) 

976 

977 self._packed_refs = packed_refs 

978 

979 def get_peeled(self, name: bytes) -> Optional[bytes]: 

980 """Return the cached peeled value of a ref, if available. 

981 

982 Args: 

983 name: Name of the ref to peel 

984 Returns: The peeled value of the ref. If the ref is known not point to 

985 a tag, this will be the SHA the ref refers to. If the ref may point 

986 to a tag, but no cached information is available, None is returned. 

987 """ 

988 self.get_packed_refs() 

989 if ( 

990 self._peeled_refs is None 

991 or self._packed_refs is None 

992 or name not in self._packed_refs 

993 ): 

994 # No cache: no peeled refs were read, or this ref is loose 

995 return None 

996 if name in self._peeled_refs: 

997 return self._peeled_refs[name] 

998 else: 

999 # Known not peelable 

1000 return self[name] 

1001 

1002 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

1003 """Read a reference file and return its contents. 

1004 

1005 If the reference file a symbolic reference, only read the first line of 

1006 the file. Otherwise, only read the first 40 bytes. 

1007 

1008 Args: 

1009 name: the refname to read, relative to refpath 

1010 Returns: The contents of the ref file, or None if the file does not 

1011 exist. 

1012 

1013 Raises: 

1014 IOError: if any other error occurs 

1015 """ 

1016 filename = self.refpath(name) 

1017 try: 

1018 with GitFile(filename, "rb") as f: 

1019 header = f.read(len(SYMREF)) 

1020 if header == SYMREF: 

1021 # Read only the first line 

1022 return header + next(iter(f)).rstrip(b"\r\n") 

1023 else: 

1024 # Read only the first 40 bytes 

1025 return header + f.read(40 - len(SYMREF)) 

1026 except (OSError, UnicodeError): 

1027 # don't assume anything specific about the error; in 

1028 # particular, invalid or forbidden paths can raise weird 

1029 # errors depending on the specific operating system 

1030 return None 

1031 

1032 def _remove_packed_ref(self, name: bytes) -> None: 

1033 if self._packed_refs is None: 

1034 return 

1035 filename = os.path.join(self.path, b"packed-refs") 

1036 # reread cached refs from disk, while holding the lock 

1037 f = GitFile(filename, "wb") 

1038 try: 

1039 self._packed_refs = None 

1040 self.get_packed_refs() 

1041 

1042 if self._packed_refs is None or name not in self._packed_refs: 

1043 f.abort() 

1044 return 

1045 

1046 del self._packed_refs[name] 

1047 if self._peeled_refs is not None: 

1048 with suppress(KeyError): 

1049 del self._peeled_refs[name] 

1050 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1051 f.close() 

1052 except BaseException: 

1053 f.abort() 

1054 raise 

1055 

1056 def set_symbolic_ref( 

1057 self, 

1058 name: bytes, 

1059 other: bytes, 

1060 committer: Optional[bytes] = None, 

1061 timestamp: Optional[int] = None, 

1062 timezone: Optional[int] = None, 

1063 message: Optional[bytes] = None, 

1064 ) -> None: 

1065 """Make a ref point at another ref. 

1066 

1067 Args: 

1068 name: Name of the ref to set 

1069 other: Name of the ref to point at 

1070 committer: Optional committer name 

1071 timestamp: Optional timestamp 

1072 timezone: Optional timezone 

1073 message: Optional message to describe the change 

1074 """ 

1075 self._check_refname(name) 

1076 self._check_refname(other) 

1077 filename = self.refpath(name) 

1078 f = GitFile(filename, "wb") 

1079 try: 

1080 f.write(SYMREF + other + b"\n") 

1081 sha = self.follow(name)[-1] 

1082 self._log( 

1083 name, 

1084 sha, 

1085 sha, 

1086 committer=committer, 

1087 timestamp=timestamp, 

1088 timezone=timezone, 

1089 message=message, 

1090 ) 

1091 except BaseException: 

1092 f.abort() 

1093 raise 

1094 else: 

1095 f.close() 

1096 

1097 def set_if_equals( 

1098 self, 

1099 name: bytes, 

1100 old_ref: Optional[bytes], 

1101 new_ref: bytes, 

1102 committer: Optional[bytes] = None, 

1103 timestamp: Optional[int] = None, 

1104 timezone: Optional[int] = None, 

1105 message: Optional[bytes] = None, 

1106 ) -> bool: 

1107 """Set a refname to new_ref only if it currently equals old_ref. 

1108 

1109 This method follows all symbolic references, and can be used to perform 

1110 an atomic compare-and-swap operation. 

1111 

1112 Args: 

1113 name: The refname to set. 

1114 old_ref: The old sha the refname must refer to, or None to set 

1115 unconditionally. 

1116 new_ref: The new sha the refname will refer to. 

1117 committer: Optional committer name 

1118 timestamp: Optional timestamp 

1119 timezone: Optional timezone 

1120 message: Set message for reflog 

1121 Returns: True if the set was successful, False otherwise. 

1122 """ 

1123 self._check_refname(name) 

1124 try: 

1125 realnames, _ = self.follow(name) 

1126 realname = realnames[-1] 

1127 except (KeyError, IndexError, SymrefLoop): 

1128 realname = name 

1129 filename = self.refpath(realname) 

1130 

1131 # make sure none of the ancestor folders is in packed refs 

1132 probe_ref = os.path.dirname(realname) 

1133 packed_refs = self.get_packed_refs() 

1134 while probe_ref: 

1135 if packed_refs.get(probe_ref, None) is not None: 

1136 raise NotADirectoryError(filename) 

1137 probe_ref = os.path.dirname(probe_ref) 

1138 

1139 ensure_dir_exists(os.path.dirname(filename)) 

1140 with GitFile(filename, "wb") as f: 

1141 if old_ref is not None: 

1142 try: 

1143 # read again while holding the lock to handle race conditions 

1144 orig_ref = self.read_loose_ref(realname) 

1145 if orig_ref is None: 

1146 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1147 if orig_ref != old_ref: 

1148 f.abort() 

1149 return False 

1150 except OSError: 

1151 f.abort() 

1152 raise 

1153 

1154 # Check if ref already has the desired value while holding the lock 

1155 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1156 current_ref = self.read_loose_ref(realname) 

1157 if current_ref is None: 

1158 current_ref = packed_refs.get(realname, None) 

1159 

1160 if current_ref is not None and current_ref == new_ref: 

1161 # Ref already has desired value, abort write to avoid fsync 

1162 f.abort() 

1163 return True 

1164 

1165 try: 

1166 f.write(new_ref + b"\n") 

1167 except OSError: 

1168 f.abort() 

1169 raise 

1170 self._log( 

1171 realname, 

1172 old_ref, 

1173 new_ref, 

1174 committer=committer, 

1175 timestamp=timestamp, 

1176 timezone=timezone, 

1177 message=message, 

1178 ) 

1179 return True 

1180 

1181 def add_if_new( 

1182 self, 

1183 name: bytes, 

1184 ref: bytes, 

1185 committer: Optional[bytes] = None, 

1186 timestamp: Optional[int] = None, 

1187 timezone: Optional[int] = None, 

1188 message: Optional[bytes] = None, 

1189 ) -> bool: 

1190 """Add a new reference only if it does not already exist. 

1191 

1192 This method follows symrefs, and only ensures that the last ref in the 

1193 chain does not exist. 

1194 

1195 Args: 

1196 name: The refname to set. 

1197 ref: The new sha the refname will refer to. 

1198 committer: Optional committer name 

1199 timestamp: Optional timestamp 

1200 timezone: Optional timezone 

1201 message: Optional message for reflog 

1202 Returns: True if the add was successful, False otherwise. 

1203 """ 

1204 try: 

1205 realnames, contents = self.follow(name) 

1206 if contents is not None: 

1207 return False 

1208 realname = realnames[-1] 

1209 except (KeyError, IndexError): 

1210 realname = name 

1211 self._check_refname(realname) 

1212 filename = self.refpath(realname) 

1213 ensure_dir_exists(os.path.dirname(filename)) 

1214 with GitFile(filename, "wb") as f: 

1215 if os.path.exists(filename) or name in self.get_packed_refs(): 

1216 f.abort() 

1217 return False 

1218 try: 

1219 f.write(ref + b"\n") 

1220 except OSError: 

1221 f.abort() 

1222 raise 

1223 else: 

1224 self._log( 

1225 name, 

1226 None, 

1227 ref, 

1228 committer=committer, 

1229 timestamp=timestamp, 

1230 timezone=timezone, 

1231 message=message, 

1232 ) 

1233 return True 

1234 

1235 def remove_if_equals( 

1236 self, 

1237 name: bytes, 

1238 old_ref: Optional[bytes], 

1239 committer: Optional[bytes] = None, 

1240 timestamp: Optional[int] = None, 

1241 timezone: Optional[int] = None, 

1242 message: Optional[bytes] = None, 

1243 ) -> bool: 

1244 """Remove a refname only if it currently equals old_ref. 

1245 

1246 This method does not follow symbolic references. It can be used to 

1247 perform an atomic compare-and-delete operation. 

1248 

1249 Args: 

1250 name: The refname to delete. 

1251 old_ref: The old sha the refname must refer to, or None to 

1252 delete unconditionally. 

1253 committer: Optional committer name 

1254 timestamp: Optional timestamp 

1255 timezone: Optional timezone 

1256 message: Optional message 

1257 Returns: True if the delete was successful, False otherwise. 

1258 """ 

1259 self._check_refname(name) 

1260 filename = self.refpath(name) 

1261 ensure_dir_exists(os.path.dirname(filename)) 

1262 f = GitFile(filename, "wb") 

1263 try: 

1264 if old_ref is not None: 

1265 orig_ref = self.read_loose_ref(name) 

1266 if orig_ref is None: 

1267 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1268 if orig_ref != old_ref: 

1269 return False 

1270 

1271 # remove the reference file itself 

1272 try: 

1273 found = os.path.lexists(filename) 

1274 except OSError: 

1275 # may only be packed, or otherwise unstorable 

1276 found = False 

1277 

1278 if found: 

1279 os.remove(filename) 

1280 

1281 self._remove_packed_ref(name) 

1282 self._log( 

1283 name, 

1284 old_ref, 

1285 None, 

1286 committer=committer, 

1287 timestamp=timestamp, 

1288 timezone=timezone, 

1289 message=message, 

1290 ) 

1291 finally: 

1292 # never write, we just wanted the lock 

1293 f.abort() 

1294 

1295 # outside of the lock, clean-up any parent directory that might now 

1296 # be empty. this ensures that re-creating a reference of the same 

1297 # name of what was previously a directory works as expected 

1298 parent = name 

1299 while True: 

1300 try: 

1301 parent, _ = parent.rsplit(b"/", 1) 

1302 except ValueError: 

1303 break 

1304 

1305 if parent == b"refs": 

1306 break 

1307 parent_filename = self.refpath(parent) 

1308 try: 

1309 os.rmdir(parent_filename) 

1310 except OSError: 

1311 # this can be caused by the parent directory being 

1312 # removed by another process, being not empty, etc. 

1313 # in any case, this is non fatal because we already 

1314 # removed the reference, just ignore it 

1315 break 

1316 

1317 return True 

1318 

1319 def pack_refs(self, all: bool = False) -> None: 

1320 """Pack loose refs into packed-refs file. 

1321 

1322 Args: 

1323 all: If True, pack all refs. If False, only pack tags. 

1324 """ 

1325 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1326 for ref in self.allkeys(): 

1327 if ref == HEADREF: 

1328 # Never pack HEAD 

1329 continue 

1330 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1331 try: 

1332 sha = self[ref] 

1333 if sha: 

1334 refs_to_pack[ref] = sha 

1335 except KeyError: 

1336 # Broken ref, skip it 

1337 pass 

1338 

1339 if refs_to_pack: 

1340 self.add_packed_refs(refs_to_pack) 

1341 

1342 

1343def _split_ref_line(line: bytes) -> tuple[bytes, bytes]: 

1344 """Split a single ref line into a tuple of SHA1 and name.""" 

1345 fields = line.rstrip(b"\n\r").split(b" ") 

1346 if len(fields) != 2: 

1347 raise PackedRefsException(f"invalid ref line {line!r}") 

1348 sha, name = fields 

1349 if not valid_hexsha(sha): 

1350 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1351 if not check_ref_format(name): 

1352 raise PackedRefsException(f"invalid ref name {name!r}") 

1353 return (sha, name) 

1354 

1355 

1356def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]: 

1357 """Read a packed refs file. 

1358 

1359 Args: 

1360 f: file-like object to read from 

1361 Returns: Iterator over tuples with SHA1s and ref names. 

1362 """ 

1363 for line in f: 

1364 if line.startswith(b"#"): 

1365 # Comment 

1366 continue 

1367 if line.startswith(b"^"): 

1368 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1369 yield _split_ref_line(line) 

1370 

1371 

1372def read_packed_refs_with_peeled( 

1373 f: IO[bytes], 

1374) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]: 

1375 """Read a packed refs file including peeled refs. 

1376 

1377 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1378 with ref names, SHA1s, and peeled SHA1s (or None). 

1379 

1380 Args: 

1381 f: file-like object to read from, seek'ed to the second line 

1382 """ 

1383 last = None 

1384 for line in f: 

1385 if line.startswith(b"#"): 

1386 continue 

1387 line = line.rstrip(b"\r\n") 

1388 if line.startswith(b"^"): 

1389 if not last: 

1390 raise PackedRefsException("unexpected peeled ref line") 

1391 if not valid_hexsha(line[1:]): 

1392 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1393 sha, name = _split_ref_line(last) 

1394 last = None 

1395 yield (sha, name, line[1:]) 

1396 else: 

1397 if last: 

1398 sha, name = _split_ref_line(last) 

1399 yield (sha, name, None) 

1400 last = line 

1401 if last: 

1402 sha, name = _split_ref_line(last) 

1403 yield (sha, name, None) 

1404 

1405 

1406def write_packed_refs( 

1407 f: IO[bytes], 

1408 packed_refs: Mapping[bytes, bytes], 

1409 peeled_refs: Optional[Mapping[bytes, bytes]] = None, 

1410) -> None: 

1411 """Write a packed refs file. 

1412 

1413 Args: 

1414 f: empty file-like object to write to 

1415 packed_refs: dict of refname to sha of packed refs to write 

1416 peeled_refs: dict of refname to peeled value of sha 

1417 """ 

1418 if peeled_refs is None: 

1419 peeled_refs = {} 

1420 else: 

1421 f.write(b"# pack-refs with: peeled\n") 

1422 for refname in sorted(packed_refs.keys()): 

1423 f.write(git_line(packed_refs[refname], refname)) 

1424 if refname in peeled_refs: 

1425 f.write(b"^" + peeled_refs[refname] + b"\n") 

1426 

1427 

1428def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]: 

1429 """Read info/refs file. 

1430 

1431 Args: 

1432 f: File-like object to read from 

1433 

1434 Returns: 

1435 Dictionary mapping ref names to SHA1s 

1436 """ 

1437 ret = {} 

1438 for line in f.readlines(): 

1439 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1440 ret[name] = sha 

1441 return ret 

1442 

1443 

1444def write_info_refs( 

1445 refs: Mapping[bytes, bytes], store: ObjectContainer 

1446) -> Iterator[bytes]: 

1447 """Generate info refs.""" 

1448 # TODO: Avoid recursive import :( 

1449 from .object_store import peel_sha 

1450 

1451 for name, sha in sorted(refs.items()): 

1452 # get_refs() includes HEAD as a special case, but we don't want to 

1453 # advertise it 

1454 if name == HEADREF: 

1455 continue 

1456 try: 

1457 o = store[sha] 

1458 except KeyError: 

1459 continue 

1460 _unpeeled, peeled = peel_sha(store, sha) 

1461 yield o.id + b"\t" + name + b"\n" 

1462 if o.id != peeled.id: 

1463 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1464 

1465 

1466def is_local_branch(x: bytes) -> bool: 

1467 """Check if a ref name is a local branch.""" 

1468 return x.startswith(LOCAL_BRANCH_PREFIX) 

1469 

1470 

1471T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]]) 

1472 

1473 

1474def strip_peeled_refs(refs: T) -> T: 

1475 """Remove all peeled refs.""" 

1476 return { 

1477 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1478 } 

1479 

1480 

1481def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]: 

1482 """Split peeled refs from regular refs.""" 

1483 peeled: dict[bytes, bytes] = {} 

1484 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)} 

1485 

1486 for ref, sha in refs.items(): 

1487 if ref.endswith(PEELED_TAG_SUFFIX): 

1488 # Only add to peeled dict if sha is not None 

1489 if sha is not None: 

1490 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1491 

1492 return regular, peeled 

1493 

1494 

1495def _set_origin_head( 

1496 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes] 

1497) -> None: 

1498 # set refs/remotes/origin/HEAD 

1499 origin_base = b"refs/remotes/" + origin + b"/" 

1500 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1501 origin_ref = origin_base + HEADREF 

1502 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1503 if target_ref in refs: 

1504 refs.set_symbolic_ref(origin_ref, target_ref) 

1505 

1506 

1507def _set_default_branch( 

1508 refs: RefsContainer, 

1509 origin: bytes, 

1510 origin_head: Optional[bytes], 

1511 branch: Optional[bytes], 

1512 ref_message: Optional[bytes], 

1513) -> bytes: 

1514 """Set the default branch.""" 

1515 origin_base = b"refs/remotes/" + origin + b"/" 

1516 if branch: 

1517 origin_ref = origin_base + branch 

1518 if origin_ref in refs: 

1519 local_ref = LOCAL_BRANCH_PREFIX + branch 

1520 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1521 head_ref = local_ref 

1522 elif LOCAL_TAG_PREFIX + branch in refs: 

1523 head_ref = LOCAL_TAG_PREFIX + branch 

1524 else: 

1525 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1526 elif origin_head: 

1527 head_ref = origin_head 

1528 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1529 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1530 else: 

1531 origin_ref = origin_head 

1532 try: 

1533 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1534 except KeyError: 

1535 pass 

1536 else: 

1537 raise ValueError("neither origin_head nor branch are provided") 

1538 return head_ref 

1539 

1540 

1541def _set_head( 

1542 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes] 

1543) -> Optional[bytes]: 

1544 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1545 # detach HEAD at specified tag 

1546 head = refs[head_ref] 

1547 if isinstance(head, Tag): 

1548 _cls, obj = head.object 

1549 head = obj.get_object(obj).id 

1550 del refs[HEADREF] 

1551 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1552 else: 

1553 # set HEAD to specific branch 

1554 try: 

1555 head = refs[head_ref] 

1556 refs.set_symbolic_ref(HEADREF, head_ref) 

1557 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1558 except KeyError: 

1559 head = None 

1560 return head 

1561 

1562 

1563def _import_remote_refs( 

1564 refs_container: RefsContainer, 

1565 remote_name: str, 

1566 refs: dict[bytes, Optional[bytes]], 

1567 message: Optional[bytes] = None, 

1568 prune: bool = False, 

1569 prune_tags: bool = False, 

1570) -> None: 

1571 stripped_refs = strip_peeled_refs(refs) 

1572 branches = { 

1573 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1574 for (n, v) in stripped_refs.items() 

1575 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None 

1576 } 

1577 refs_container.import_refs( 

1578 b"refs/remotes/" + remote_name.encode(), 

1579 branches, 

1580 message=message, 

1581 prune=prune, 

1582 ) 

1583 tags = { 

1584 n[len(LOCAL_TAG_PREFIX) :]: v 

1585 for (n, v) in stripped_refs.items() 

1586 if n.startswith(LOCAL_TAG_PREFIX) 

1587 and not n.endswith(PEELED_TAG_SUFFIX) 

1588 and v is not None 

1589 } 

1590 refs_container.import_refs( 

1591 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1592 ) 

1593 

1594 

1595def serialize_refs( 

1596 store: ObjectContainer, refs: Mapping[bytes, bytes] 

1597) -> dict[bytes, bytes]: 

1598 """Serialize refs with peeled refs. 

1599 

1600 Args: 

1601 store: Object store to peel refs from 

1602 refs: Dictionary of ref names to SHAs 

1603 

1604 Returns: 

1605 Dictionary with refs and peeled refs (marked with ^{}) 

1606 """ 

1607 # TODO: Avoid recursive import :( 

1608 from .object_store import peel_sha 

1609 

1610 ret = {} 

1611 for ref, sha in refs.items(): 

1612 try: 

1613 unpeeled, peeled = peel_sha(store, sha) 

1614 except KeyError: 

1615 warnings.warn( 

1616 "ref {} points at non-present sha {}".format( 

1617 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1618 ), 

1619 UserWarning, 

1620 ) 

1621 continue 

1622 else: 

1623 if isinstance(unpeeled, Tag): 

1624 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1625 ret[ref] = unpeeled.id 

1626 return ret 

1627 

1628 

1629class locked_ref: 

1630 """Lock a ref while making modifications. 

1631 

1632 Works as a context manager. 

1633 """ 

1634 

1635 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1636 """Initialize a locked ref. 

1637 

1638 Args: 

1639 refs_container: The DiskRefsContainer to lock the ref in 

1640 refname: The ref name to lock 

1641 """ 

1642 self._refs_container = refs_container 

1643 self._refname = refname 

1644 self._file: Optional[_GitFile] = None 

1645 self._realname: Optional[Ref] = None 

1646 self._deleted = False 

1647 

1648 def __enter__(self) -> "locked_ref": 

1649 """Enter the context manager and acquire the lock. 

1650 

1651 Returns: 

1652 This locked_ref instance 

1653 

1654 Raises: 

1655 OSError: If the lock cannot be acquired 

1656 """ 

1657 self._refs_container._check_refname(self._refname) 

1658 try: 

1659 realnames, _ = self._refs_container.follow(self._refname) 

1660 self._realname = realnames[-1] 

1661 except (KeyError, IndexError, SymrefLoop): 

1662 self._realname = self._refname 

1663 

1664 filename = self._refs_container.refpath(self._realname) 

1665 ensure_dir_exists(os.path.dirname(filename)) 

1666 f = GitFile(filename, "wb") 

1667 self._file = f 

1668 return self 

1669 

1670 def __exit__( 

1671 self, 

1672 exc_type: Optional[type], 

1673 exc_value: Optional[BaseException], 

1674 traceback: Optional[types.TracebackType], 

1675 ) -> None: 

1676 """Exit the context manager and release the lock. 

1677 

1678 Args: 

1679 exc_type: Type of exception if one occurred 

1680 exc_value: Exception instance if one occurred 

1681 traceback: Traceback if an exception occurred 

1682 """ 

1683 if self._file: 

1684 if exc_type is not None or self._deleted: 

1685 self._file.abort() 

1686 else: 

1687 self._file.close() 

1688 

1689 def get(self) -> Optional[bytes]: 

1690 """Get the current value of the ref.""" 

1691 if not self._file: 

1692 raise RuntimeError("locked_ref not in context") 

1693 

1694 assert self._realname is not None 

1695 current_ref = self._refs_container.read_loose_ref(self._realname) 

1696 if current_ref is None: 

1697 current_ref = self._refs_container.get_packed_refs().get( 

1698 self._realname, None 

1699 ) 

1700 return current_ref 

1701 

1702 def ensure_equals(self, expected_value: Optional[bytes]) -> bool: 

1703 """Ensure the ref currently equals the expected value. 

1704 

1705 Args: 

1706 expected_value: The expected current value of the ref 

1707 Returns: 

1708 True if the ref equals the expected value, False otherwise 

1709 """ 

1710 current_value = self.get() 

1711 return current_value == expected_value 

1712 

1713 def set(self, new_ref: bytes) -> None: 

1714 """Set the ref to a new value. 

1715 

1716 Args: 

1717 new_ref: The new SHA1 or symbolic ref value 

1718 """ 

1719 if not self._file: 

1720 raise RuntimeError("locked_ref not in context") 

1721 

1722 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1723 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1724 

1725 self._file.seek(0) 

1726 self._file.truncate() 

1727 self._file.write(new_ref + b"\n") 

1728 self._deleted = False 

1729 

1730 def set_symbolic_ref(self, target: Ref) -> None: 

1731 """Make this ref point at another ref. 

1732 

1733 Args: 

1734 target: Name of the ref to point at 

1735 """ 

1736 if not self._file: 

1737 raise RuntimeError("locked_ref not in context") 

1738 

1739 self._refs_container._check_refname(target) 

1740 self._file.seek(0) 

1741 self._file.truncate() 

1742 self._file.write(SYMREF + target + b"\n") 

1743 self._deleted = False 

1744 

1745 def delete(self) -> None: 

1746 """Delete the ref file while holding the lock.""" 

1747 if not self._file: 

1748 raise RuntimeError("locked_ref not in context") 

1749 

1750 # Delete the actual ref file while holding the lock 

1751 if self._realname: 

1752 filename = self._refs_container.refpath(self._realname) 

1753 try: 

1754 if os.path.lexists(filename): 

1755 os.remove(filename) 

1756 except FileNotFoundError: 

1757 pass 

1758 self._refs_container._remove_packed_ref(self._realname) 

1759 

1760 self._deleted = True 

1761 

1762 

1763def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

1764 """Filter refs to only include those with a given prefix. 

1765 

1766 Args: 

1767 refs: A dictionary of refs. 

1768 prefixes: The prefixes to filter by. 

1769 """ 

1770 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

1771 return filtered 

1772 

1773 

1774def is_per_worktree_ref(ref: bytes) -> bool: 

1775 """Returns whether a reference is stored per worktree or not. 

1776 

1777 Per-worktree references are: 

1778 - all pseudorefs, e.g. HEAD 

1779 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

1780 

1781 All refs starting with "refs/" are shared, except for the ones listed above. 

1782 

1783 See https://git-scm.com/docs/git-worktree#_refs. 

1784 """ 

1785 return not ref.startswith(b"refs/") or ref.startswith( 

1786 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

1787 )