Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

715 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import types 

27import warnings 

28from collections.abc import Iterable, Iterator 

29from contextlib import suppress 

30from typing import ( 

31 IO, 

32 TYPE_CHECKING, 

33 Any, 

34 BinaryIO, 

35 Callable, 

36 Optional, 

37 TypeVar, 

38 Union, 

39 cast, 

40) 

41 

42if TYPE_CHECKING: 

43 from .file import _GitFile 

44 

45from .errors import PackedRefsException, RefFormatError 

46from .file import GitFile, ensure_dir_exists 

47from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

48from .pack import ObjectContainer 

49 

50Ref = bytes 

51 

52HEADREF = b"HEAD" 

53SYMREF = b"ref: " 

54LOCAL_BRANCH_PREFIX = b"refs/heads/" 

55LOCAL_TAG_PREFIX = b"refs/tags/" 

56LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

57LOCAL_NOTES_PREFIX = b"refs/notes/" 

58BAD_REF_CHARS = set(b"\177 ~^:?*[") 

59PEELED_TAG_SUFFIX = b"^{}" 

60 

61# For backwards compatibility 

62ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

63 

64 

65class SymrefLoop(Exception): 

66 """There is a loop between one or more symrefs.""" 

67 

68 def __init__(self, ref: bytes, depth: int) -> None: 

69 """Initialize SymrefLoop exception.""" 

70 self.ref = ref 

71 self.depth = depth 

72 

73 

74def parse_symref_value(contents: bytes) -> bytes: 

75 """Parse a symref value. 

76 

77 Args: 

78 contents: Contents to parse 

79 Returns: Destination 

80 """ 

81 if contents.startswith(SYMREF): 

82 return contents[len(SYMREF) :].rstrip(b"\r\n") 

83 raise ValueError(contents) 

84 

85 

86def check_ref_format(refname: Ref) -> bool: 

87 """Check if a refname is correctly formatted. 

88 

89 Implements all the same rules as git-check-ref-format[1]. 

90 

91 [1] 

92 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

93 

94 Args: 

95 refname: The refname to check 

96 Returns: True if refname is valid, False otherwise 

97 """ 

98 # These could be combined into one big expression, but are listed 

99 # separately to parallel [1]. 

100 if b"/." in refname or refname.startswith(b"."): 

101 return False 

102 if b"/" not in refname: 

103 return False 

104 if b".." in refname: 

105 return False 

106 for i, c in enumerate(refname): 

107 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

108 return False 

109 if refname[-1] in b"/.": 

110 return False 

111 if refname.endswith(b".lock"): 

112 return False 

113 if b"@{" in refname: 

114 return False 

115 if b"\\" in refname: 

116 return False 

117 return True 

118 

119 

120def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

121 """Parse a remote ref into remote name and branch name. 

122 

123 Args: 

124 ref: Remote ref like b"refs/remotes/origin/main" 

125 

126 Returns: 

127 Tuple of (remote_name, branch_name) 

128 

129 Raises: 

130 ValueError: If ref is not a valid remote ref 

131 """ 

132 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

133 raise ValueError(f"Not a remote ref: {ref!r}") 

134 

135 # Remove the prefix 

136 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

137 

138 # Split into remote name and branch name 

139 parts = remainder.split(b"/", 1) 

140 if len(parts) != 2: 

141 raise ValueError(f"Invalid remote ref format: {ref!r}") 

142 

143 remote_name, branch_name = parts 

144 return (remote_name, branch_name) 

145 

146 

147class RefsContainer: 

148 """A container for refs.""" 

149 

150 def __init__( 

151 self, 

152 logger: Optional[ 

153 Callable[ 

154 [ 

155 bytes, 

156 Optional[bytes], 

157 Optional[bytes], 

158 Optional[bytes], 

159 Optional[int], 

160 Optional[int], 

161 Optional[bytes], 

162 ], 

163 None, 

164 ] 

165 ] = None, 

166 ) -> None: 

167 """Initialize RefsContainer with optional logger function.""" 

168 self._logger = logger 

169 

170 def _log( 

171 self, 

172 ref: bytes, 

173 old_sha: Optional[bytes], 

174 new_sha: Optional[bytes], 

175 committer: Optional[bytes] = None, 

176 timestamp: Optional[int] = None, 

177 timezone: Optional[int] = None, 

178 message: Optional[bytes] = None, 

179 ) -> None: 

180 if self._logger is None: 

181 return 

182 if message is None: 

183 return 

184 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

185 

186 def set_symbolic_ref( 

187 self, 

188 name: bytes, 

189 other: bytes, 

190 committer: Optional[bytes] = None, 

191 timestamp: Optional[int] = None, 

192 timezone: Optional[int] = None, 

193 message: Optional[bytes] = None, 

194 ) -> None: 

195 """Make a ref point at another ref. 

196 

197 Args: 

198 name: Name of the ref to set 

199 other: Name of the ref to point at 

200 committer: Optional committer name/email 

201 timestamp: Optional timestamp 

202 timezone: Optional timezone 

203 message: Optional message 

204 """ 

205 raise NotImplementedError(self.set_symbolic_ref) 

206 

207 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

208 """Get contents of the packed-refs file. 

209 

210 Returns: Dictionary mapping ref names to SHA1s 

211 

212 Note: Will return an empty dictionary when no packed-refs file is 

213 present. 

214 """ 

215 raise NotImplementedError(self.get_packed_refs) 

216 

217 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

218 """Add the given refs as packed refs. 

219 

220 Args: 

221 new_refs: A mapping of ref names to targets; if a target is None that 

222 means remove the ref 

223 """ 

224 raise NotImplementedError(self.add_packed_refs) 

225 

226 def get_peeled(self, name: bytes) -> Optional[ObjectID]: 

227 """Return the cached peeled value of a ref, if available. 

228 

229 Args: 

230 name: Name of the ref to peel 

231 Returns: The peeled value of the ref. If the ref is known not point to 

232 a tag, this will be the SHA the ref refers to. If the ref may point 

233 to a tag, but no cached information is available, None is returned. 

234 """ 

235 return None 

236 

237 def import_refs( 

238 self, 

239 base: Ref, 

240 other: dict[Ref, ObjectID], 

241 committer: Optional[bytes] = None, 

242 timestamp: Optional[bytes] = None, 

243 timezone: Optional[bytes] = None, 

244 message: Optional[bytes] = None, 

245 prune: bool = False, 

246 ) -> None: 

247 """Import refs from another repository. 

248 

249 Args: 

250 base: Base ref to import into (e.g., b'refs/remotes/origin') 

251 other: Dictionary of refs to import 

252 committer: Optional committer for reflog 

253 timestamp: Optional timestamp for reflog 

254 timezone: Optional timezone for reflog 

255 message: Optional message for reflog 

256 prune: If True, remove refs not in other 

257 """ 

258 if prune: 

259 to_delete = set(self.subkeys(base)) 

260 else: 

261 to_delete = set() 

262 for name, value in other.items(): 

263 if value is None: 

264 to_delete.add(name) 

265 else: 

266 self.set_if_equals( 

267 b"/".join((base, name)), None, value, message=message 

268 ) 

269 if to_delete: 

270 try: 

271 to_delete.remove(name) 

272 except KeyError: 

273 pass 

274 for ref in to_delete: 

275 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

276 

277 def allkeys(self) -> set[Ref]: 

278 """All refs present in this container.""" 

279 raise NotImplementedError(self.allkeys) 

280 

281 def __iter__(self) -> Iterator[Ref]: 

282 """Iterate over all reference keys.""" 

283 return iter(self.allkeys()) 

284 

285 def keys(self, base=None): 

286 """Refs present in this container. 

287 

288 Args: 

289 base: An optional base to return refs under. 

290 Returns: An unsorted set of valid refs in this container, including 

291 packed refs. 

292 """ 

293 if base is not None: 

294 return self.subkeys(base) 

295 else: 

296 return self.allkeys() 

297 

298 def subkeys(self, base: bytes) -> set[bytes]: 

299 """Refs present in this container under a base. 

300 

301 Args: 

302 base: The base to return refs under. 

303 Returns: A set of valid refs in this container under the base; the base 

304 prefix is stripped from the ref names returned. 

305 """ 

306 keys = set() 

307 base_len = len(base) + 1 

308 for refname in self.allkeys(): 

309 if refname.startswith(base): 

310 keys.add(refname[base_len:]) 

311 return keys 

312 

313 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]: 

314 """Return the contents of this container as a dictionary.""" 

315 ret = {} 

316 keys = self.keys(base) 

317 if base is None: 

318 base = b"" 

319 else: 

320 base = base.rstrip(b"/") 

321 for key in keys: 

322 try: 

323 ret[key] = self[(base + b"/" + key).strip(b"/")] 

324 except (SymrefLoop, KeyError): 

325 continue # Unable to resolve 

326 

327 return ret 

328 

329 def _check_refname(self, name: bytes) -> None: 

330 """Ensure a refname is valid and lives in refs or is HEAD. 

331 

332 HEAD is not a valid refname according to git-check-ref-format, but this 

333 class needs to be able to touch HEAD. Also, check_ref_format expects 

334 refnames without the leading 'refs/', but this class requires that 

335 so it cannot touch anything outside the refs dir (or HEAD). 

336 

337 Args: 

338 name: The name of the reference. 

339 

340 Raises: 

341 KeyError: if a refname is not HEAD or is otherwise not valid. 

342 """ 

343 if name in (HEADREF, b"refs/stash"): 

344 return 

345 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

346 raise RefFormatError(name) 

347 

348 def read_ref(self, refname: bytes) -> Optional[bytes]: 

349 """Read a reference without following any references. 

350 

351 Args: 

352 refname: The name of the reference 

353 Returns: The contents of the ref file, or None if it does 

354 not exist. 

355 """ 

356 contents = self.read_loose_ref(refname) 

357 if not contents: 

358 contents = self.get_packed_refs().get(refname, None) 

359 return contents 

360 

361 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

362 """Read a loose reference and return its contents. 

363 

364 Args: 

365 name: the refname to read 

366 Returns: The contents of the ref file, or None if it does 

367 not exist. 

368 """ 

369 raise NotImplementedError(self.read_loose_ref) 

370 

371 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]: 

372 """Follow a reference name. 

373 

374 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

375 references in the chain 

376 """ 

377 contents: Optional[bytes] = SYMREF + name 

378 depth = 0 

379 refnames = [] 

380 while contents and contents.startswith(SYMREF): 

381 refname = contents[len(SYMREF) :] 

382 refnames.append(refname) 

383 contents = self.read_ref(refname) 

384 if not contents: 

385 break 

386 depth += 1 

387 if depth > 5: 

388 raise SymrefLoop(name, depth) 

389 return refnames, contents 

390 

391 def __contains__(self, refname: bytes) -> bool: 

392 """Check if a reference exists.""" 

393 if self.read_ref(refname): 

394 return True 

395 return False 

396 

397 def __getitem__(self, name: bytes) -> ObjectID: 

398 """Get the SHA1 for a reference name. 

399 

400 This method follows all symbolic references. 

401 """ 

402 _, sha = self.follow(name) 

403 if sha is None: 

404 raise KeyError(name) 

405 return sha 

406 

407 def set_if_equals( 

408 self, 

409 name: bytes, 

410 old_ref: Optional[bytes], 

411 new_ref: bytes, 

412 committer: Optional[bytes] = None, 

413 timestamp: Optional[int] = None, 

414 timezone: Optional[int] = None, 

415 message: Optional[bytes] = None, 

416 ) -> bool: 

417 """Set a refname to new_ref only if it currently equals old_ref. 

418 

419 This method follows all symbolic references if applicable for the 

420 subclass, and can be used to perform an atomic compare-and-swap 

421 operation. 

422 

423 Args: 

424 name: The refname to set. 

425 old_ref: The old sha the refname must refer to, or None to set 

426 unconditionally. 

427 new_ref: The new sha the refname will refer to. 

428 committer: Optional committer name/email 

429 timestamp: Optional timestamp 

430 timezone: Optional timezone 

431 message: Message for reflog 

432 Returns: True if the set was successful, False otherwise. 

433 """ 

434 raise NotImplementedError(self.set_if_equals) 

435 

436 def add_if_new( 

437 self, 

438 name: bytes, 

439 ref: bytes, 

440 committer: Optional[bytes] = None, 

441 timestamp: Optional[int] = None, 

442 timezone: Optional[int] = None, 

443 message: Optional[bytes] = None, 

444 ) -> bool: 

445 """Add a new reference only if it does not already exist. 

446 

447 Args: 

448 name: Ref name 

449 ref: Ref value 

450 committer: Optional committer name/email 

451 timestamp: Optional timestamp 

452 timezone: Optional timezone 

453 message: Optional message for reflog 

454 """ 

455 raise NotImplementedError(self.add_if_new) 

456 

457 def __setitem__(self, name: bytes, ref: bytes) -> None: 

458 """Set a reference name to point to the given SHA1. 

459 

460 This method follows all symbolic references if applicable for the 

461 subclass. 

462 

463 Note: This method unconditionally overwrites the contents of a 

464 reference. To update atomically only if the reference has not 

465 changed, use set_if_equals(). 

466 

467 Args: 

468 name: The refname to set. 

469 ref: The new sha the refname will refer to. 

470 """ 

471 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

472 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

473 self.set_if_equals(name, None, ref) 

474 

475 def remove_if_equals( 

476 self, 

477 name: bytes, 

478 old_ref: Optional[bytes], 

479 committer: Optional[bytes] = None, 

480 timestamp: Optional[int] = None, 

481 timezone: Optional[int] = None, 

482 message: Optional[bytes] = None, 

483 ) -> bool: 

484 """Remove a refname only if it currently equals old_ref. 

485 

486 This method does not follow symbolic references, even if applicable for 

487 the subclass. It can be used to perform an atomic compare-and-delete 

488 operation. 

489 

490 Args: 

491 name: The refname to delete. 

492 old_ref: The old sha the refname must refer to, or None to 

493 delete unconditionally. 

494 committer: Optional committer name/email 

495 timestamp: Optional timestamp 

496 timezone: Optional timezone 

497 message: Message for reflog 

498 Returns: True if the delete was successful, False otherwise. 

499 """ 

500 raise NotImplementedError(self.remove_if_equals) 

501 

502 def __delitem__(self, name: bytes) -> None: 

503 """Remove a refname. 

504 

505 This method does not follow symbolic references, even if applicable for 

506 the subclass. 

507 

508 Note: This method unconditionally deletes the contents of a reference. 

509 To delete atomically only if the reference has not changed, use 

510 remove_if_equals(). 

511 

512 Args: 

513 name: The refname to delete. 

514 """ 

515 self.remove_if_equals(name, None) 

516 

517 def get_symrefs(self) -> dict[bytes, bytes]: 

518 """Get a dict with all symrefs in this container. 

519 

520 Returns: Dictionary mapping source ref to target ref 

521 """ 

522 ret = {} 

523 for src in self.allkeys(): 

524 try: 

525 ref_value = self.read_ref(src) 

526 assert ref_value is not None 

527 dst = parse_symref_value(ref_value) 

528 except ValueError: 

529 pass 

530 else: 

531 ret[src] = dst 

532 return ret 

533 

534 def pack_refs(self, all: bool = False) -> None: 

535 """Pack loose refs into packed-refs file. 

536 

537 Args: 

538 all: If True, pack all refs. If False, only pack tags. 

539 """ 

540 raise NotImplementedError(self.pack_refs) 

541 

542 

543class DictRefsContainer(RefsContainer): 

544 """RefsContainer backed by a simple dict. 

545 

546 This container does not support symbolic or packed references and is not 

547 threadsafe. 

548 """ 

549 

550 def __init__( 

551 self, 

552 refs: dict[bytes, bytes], 

553 logger: Optional[ 

554 Callable[ 

555 [ 

556 bytes, 

557 Optional[bytes], 

558 Optional[bytes], 

559 Optional[bytes], 

560 Optional[int], 

561 Optional[int], 

562 Optional[bytes], 

563 ], 

564 None, 

565 ] 

566 ] = None, 

567 ) -> None: 

568 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

569 super().__init__(logger=logger) 

570 self._refs = refs 

571 self._peeled: dict[bytes, ObjectID] = {} 

572 self._watchers: set[Any] = set() 

573 

574 def allkeys(self) -> set[bytes]: 

575 """Return all reference keys.""" 

576 return set(self._refs.keys()) 

577 

578 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

579 """Read a loose reference.""" 

580 return self._refs.get(name, None) 

581 

582 def get_packed_refs(self) -> dict[bytes, bytes]: 

583 """Get packed references.""" 

584 return {} 

585 

586 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None: 

587 for watcher in self._watchers: 

588 watcher._notify((ref, newsha)) 

589 

590 def set_symbolic_ref( 

591 self, 

592 name: Ref, 

593 other: Ref, 

594 committer: Optional[bytes] = None, 

595 timestamp: Optional[int] = None, 

596 timezone: Optional[int] = None, 

597 message: Optional[bytes] = None, 

598 ) -> None: 

599 """Make a ref point at another ref. 

600 

601 Args: 

602 name: Name of the ref to set 

603 other: Name of the ref to point at 

604 committer: Optional committer name for reflog 

605 timestamp: Optional timestamp for reflog 

606 timezone: Optional timezone for reflog 

607 message: Optional message for reflog 

608 """ 

609 old = self.follow(name)[-1] 

610 new = SYMREF + other 

611 self._refs[name] = new 

612 self._notify(name, new) 

613 self._log( 

614 name, 

615 old, 

616 new, 

617 committer=committer, 

618 timestamp=timestamp, 

619 timezone=timezone, 

620 message=message, 

621 ) 

622 

623 def set_if_equals( 

624 self, 

625 name: bytes, 

626 old_ref: Optional[bytes], 

627 new_ref: bytes, 

628 committer: Optional[bytes] = None, 

629 timestamp: Optional[int] = None, 

630 timezone: Optional[int] = None, 

631 message: Optional[bytes] = None, 

632 ) -> bool: 

633 """Set a refname to new_ref only if it currently equals old_ref. 

634 

635 This method follows all symbolic references, and can be used to perform 

636 an atomic compare-and-swap operation. 

637 

638 Args: 

639 name: The refname to set. 

640 old_ref: The old sha the refname must refer to, or None to set 

641 unconditionally. 

642 new_ref: The new sha the refname will refer to. 

643 committer: Optional committer name for reflog 

644 timestamp: Optional timestamp for reflog 

645 timezone: Optional timezone for reflog 

646 message: Optional message for reflog 

647 

648 Returns: 

649 True if the set was successful, False otherwise. 

650 """ 

651 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

652 return False 

653 # Only update the specific ref requested, not the whole chain 

654 self._check_refname(name) 

655 old = self._refs.get(name) 

656 self._refs[name] = new_ref 

657 self._notify(name, new_ref) 

658 self._log( 

659 name, 

660 old, 

661 new_ref, 

662 committer=committer, 

663 timestamp=timestamp, 

664 timezone=timezone, 

665 message=message, 

666 ) 

667 return True 

668 

669 def add_if_new( 

670 self, 

671 name: Ref, 

672 ref: ObjectID, 

673 committer: Optional[bytes] = None, 

674 timestamp: Optional[int] = None, 

675 timezone: Optional[int] = None, 

676 message: Optional[bytes] = None, 

677 ) -> bool: 

678 """Add a new reference only if it does not already exist. 

679 

680 Args: 

681 name: Ref name 

682 ref: Ref value 

683 committer: Optional committer name for reflog 

684 timestamp: Optional timestamp for reflog 

685 timezone: Optional timezone for reflog 

686 message: Optional message for reflog 

687 

688 Returns: 

689 True if the add was successful, False otherwise. 

690 """ 

691 if name in self._refs: 

692 return False 

693 self._refs[name] = ref 

694 self._notify(name, ref) 

695 self._log( 

696 name, 

697 None, 

698 ref, 

699 committer=committer, 

700 timestamp=timestamp, 

701 timezone=timezone, 

702 message=message, 

703 ) 

704 return True 

705 

706 def remove_if_equals( 

707 self, 

708 name: bytes, 

709 old_ref: Optional[bytes], 

710 committer: Optional[bytes] = None, 

711 timestamp: Optional[int] = None, 

712 timezone: Optional[int] = None, 

713 message: Optional[bytes] = None, 

714 ) -> bool: 

715 """Remove a refname only if it currently equals old_ref. 

716 

717 This method does not follow symbolic references. It can be used to 

718 perform an atomic compare-and-delete operation. 

719 

720 Args: 

721 name: The refname to delete. 

722 old_ref: The old sha the refname must refer to, or None to 

723 delete unconditionally. 

724 committer: Optional committer name for reflog 

725 timestamp: Optional timestamp for reflog 

726 timezone: Optional timezone for reflog 

727 message: Optional message for reflog 

728 

729 Returns: 

730 True if the delete was successful, False otherwise. 

731 """ 

732 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

733 return False 

734 try: 

735 old = self._refs.pop(name) 

736 except KeyError: 

737 pass 

738 else: 

739 self._notify(name, None) 

740 self._log( 

741 name, 

742 old, 

743 None, 

744 committer=committer, 

745 timestamp=timestamp, 

746 timezone=timezone, 

747 message=message, 

748 ) 

749 return True 

750 

751 def get_peeled(self, name: bytes) -> Optional[bytes]: 

752 """Get peeled version of a reference.""" 

753 return self._peeled.get(name) 

754 

755 def _update(self, refs: dict[bytes, bytes]) -> None: 

756 """Update multiple refs; intended only for testing.""" 

757 # TODO(dborowitz): replace this with a public function that uses 

758 # set_if_equal. 

759 for ref, sha in refs.items(): 

760 self.set_if_equals(ref, None, sha) 

761 

762 def _update_peeled(self, peeled: dict[bytes, bytes]) -> None: 

763 """Update cached peeled refs; intended only for testing.""" 

764 self._peeled.update(peeled) 

765 

766 

767class InfoRefsContainer(RefsContainer): 

768 """Refs container that reads refs from a info/refs file.""" 

769 

770 def __init__(self, f: BinaryIO) -> None: 

771 """Initialize InfoRefsContainer from info/refs file.""" 

772 self._refs: dict[bytes, bytes] = {} 

773 self._peeled: dict[bytes, bytes] = {} 

774 refs = read_info_refs(f) 

775 (self._refs, self._peeled) = split_peeled_refs(refs) 

776 

777 def allkeys(self) -> set[bytes]: 

778 """Return all reference keys.""" 

779 return set(self._refs.keys()) 

780 

781 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

782 """Read a loose reference.""" 

783 return self._refs.get(name, None) 

784 

785 def get_packed_refs(self) -> dict[bytes, bytes]: 

786 """Get packed references.""" 

787 return {} 

788 

789 def get_peeled(self, name: bytes) -> Optional[bytes]: 

790 """Get peeled version of a reference.""" 

791 try: 

792 return self._peeled[name] 

793 except KeyError: 

794 return self._refs[name] 

795 

796 

797class DiskRefsContainer(RefsContainer): 

798 """Refs container that reads refs from disk.""" 

799 

800 def __init__( 

801 self, 

802 path: Union[str, bytes, os.PathLike], 

803 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None, 

804 logger: Optional[ 

805 Callable[ 

806 [ 

807 bytes, 

808 Optional[bytes], 

809 Optional[bytes], 

810 Optional[bytes], 

811 Optional[int], 

812 Optional[int], 

813 Optional[bytes], 

814 ], 

815 None, 

816 ] 

817 ] = None, 

818 ) -> None: 

819 """Initialize DiskRefsContainer.""" 

820 super().__init__(logger=logger) 

821 # Convert path-like objects to strings, then to bytes for Git compatibility 

822 self.path = os.fsencode(os.fspath(path)) 

823 if worktree_path is None: 

824 self.worktree_path = self.path 

825 else: 

826 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

827 self._packed_refs: Optional[dict[bytes, bytes]] = None 

828 self._peeled_refs: Optional[dict[bytes, bytes]] = None 

829 

830 def __repr__(self) -> str: 

831 """Return string representation of DiskRefsContainer.""" 

832 return f"{self.__class__.__name__}({self.path!r})" 

833 

834 def _iter_dir( 

835 self, 

836 path: bytes, 

837 base: bytes, 

838 dir_filter: Optional[Callable[[bytes], bool]] = None, 

839 ) -> Iterator[bytes]: 

840 refspath = os.path.join(path, base.rstrip(b"/")) 

841 prefix_len = len(os.path.join(path, b"")) 

842 

843 for root, dirs, files in os.walk(refspath): 

844 directory = root[prefix_len:] 

845 if os.path.sep != "/": 

846 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

847 if dir_filter is not None: 

848 dirs[:] = [ 

849 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

850 ] 

851 

852 for filename in files: 

853 refname = b"/".join([directory, filename]) 

854 if check_ref_format(refname): 

855 yield refname 

856 

857 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]: 

858 base = base.rstrip(b"/") + b"/" 

859 search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = [] 

860 if base != b"refs/": 

861 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

862 search_paths.append((path, None)) 

863 elif self.worktree_path == self.path: 

864 # Iterate through all the refs from the main worktree 

865 search_paths.append((self.path, None)) 

866 else: 

867 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

868 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

869 # Iterate through all the per-worktree refs from the worktree's gitdir 

870 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

871 

872 for path, dir_filter in search_paths: 

873 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

874 

875 def subkeys(self, base: bytes) -> set[bytes]: 

876 """Return subkeys under a given base reference path.""" 

877 subkeys = set() 

878 

879 for key in self._iter_loose_refs(base): 

880 if key.startswith(base): 

881 subkeys.add(key[len(base) :].strip(b"/")) 

882 

883 for key in self.get_packed_refs(): 

884 if key.startswith(base): 

885 subkeys.add(key[len(base) :].strip(b"/")) 

886 return subkeys 

887 

888 def allkeys(self) -> set[bytes]: 

889 """Return all reference keys.""" 

890 allkeys = set() 

891 if os.path.exists(self.refpath(HEADREF)): 

892 allkeys.add(HEADREF) 

893 

894 allkeys.update(self._iter_loose_refs()) 

895 allkeys.update(self.get_packed_refs()) 

896 return allkeys 

897 

898 def refpath(self, name: bytes) -> bytes: 

899 """Return the disk path of a ref.""" 

900 path = name 

901 if os.path.sep != "/": 

902 path = path.replace(b"/", os.fsencode(os.path.sep)) 

903 

904 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

905 return os.path.join(root_dir, path) 

906 

907 def get_packed_refs(self) -> dict[bytes, bytes]: 

908 """Get contents of the packed-refs file. 

909 

910 Returns: Dictionary mapping ref names to SHA1s 

911 

912 Note: Will return an empty dictionary when no packed-refs file is 

913 present. 

914 """ 

915 # TODO: invalidate the cache on repacking 

916 if self._packed_refs is None: 

917 # set both to empty because we want _peeled_refs to be 

918 # None if and only if _packed_refs is also None. 

919 self._packed_refs = {} 

920 self._peeled_refs = {} 

921 path = os.path.join(self.path, b"packed-refs") 

922 try: 

923 f = GitFile(path, "rb") 

924 except FileNotFoundError: 

925 return {} 

926 with f: 

927 first_line = next(iter(f)).rstrip() 

928 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

929 for sha, name, peeled in read_packed_refs_with_peeled(f): 

930 self._packed_refs[name] = sha 

931 if peeled: 

932 self._peeled_refs[name] = peeled 

933 else: 

934 f.seek(0) 

935 for sha, name in read_packed_refs(f): 

936 self._packed_refs[name] = sha 

937 return self._packed_refs 

938 

939 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

940 """Add the given refs as packed refs. 

941 

942 Args: 

943 new_refs: A mapping of ref names to targets; if a target is None that 

944 means remove the ref 

945 """ 

946 if not new_refs: 

947 return 

948 

949 path = os.path.join(self.path, b"packed-refs") 

950 

951 with GitFile(path, "wb") as f: 

952 # reread cached refs from disk, while holding the lock 

953 packed_refs = self.get_packed_refs().copy() 

954 

955 for ref, target in new_refs.items(): 

956 # sanity check 

957 if ref == HEADREF: 

958 raise ValueError("cannot pack HEAD") 

959 

960 # remove any loose refs pointing to this one -- please 

961 # note that this bypasses remove_if_equals as we don't 

962 # want to affect packed refs in here 

963 with suppress(OSError): 

964 os.remove(self.refpath(ref)) 

965 

966 if target is not None: 

967 packed_refs[ref] = target 

968 else: 

969 packed_refs.pop(ref, None) 

970 

971 write_packed_refs(f, packed_refs, self._peeled_refs) 

972 

973 self._packed_refs = packed_refs 

974 

975 def get_peeled(self, name: bytes) -> Optional[bytes]: 

976 """Return the cached peeled value of a ref, if available. 

977 

978 Args: 

979 name: Name of the ref to peel 

980 Returns: The peeled value of the ref. If the ref is known not point to 

981 a tag, this will be the SHA the ref refers to. If the ref may point 

982 to a tag, but no cached information is available, None is returned. 

983 """ 

984 self.get_packed_refs() 

985 if ( 

986 self._peeled_refs is None 

987 or self._packed_refs is None 

988 or name not in self._packed_refs 

989 ): 

990 # No cache: no peeled refs were read, or this ref is loose 

991 return None 

992 if name in self._peeled_refs: 

993 return self._peeled_refs[name] 

994 else: 

995 # Known not peelable 

996 return self[name] 

997 

998 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

999 """Read a reference file and return its contents. 

1000 

1001 If the reference file a symbolic reference, only read the first line of 

1002 the file. Otherwise, only read the first 40 bytes. 

1003 

1004 Args: 

1005 name: the refname to read, relative to refpath 

1006 Returns: The contents of the ref file, or None if the file does not 

1007 exist. 

1008 

1009 Raises: 

1010 IOError: if any other error occurs 

1011 """ 

1012 filename = self.refpath(name) 

1013 try: 

1014 with GitFile(filename, "rb") as f: 

1015 header = f.read(len(SYMREF)) 

1016 if header == SYMREF: 

1017 # Read only the first line 

1018 return header + next(iter(f)).rstrip(b"\r\n") 

1019 else: 

1020 # Read only the first 40 bytes 

1021 return header + f.read(40 - len(SYMREF)) 

1022 except (OSError, UnicodeError): 

1023 # don't assume anything specific about the error; in 

1024 # particular, invalid or forbidden paths can raise weird 

1025 # errors depending on the specific operating system 

1026 return None 

1027 

1028 def _remove_packed_ref(self, name: bytes) -> None: 

1029 if self._packed_refs is None: 

1030 return 

1031 filename = os.path.join(self.path, b"packed-refs") 

1032 # reread cached refs from disk, while holding the lock 

1033 f = GitFile(filename, "wb") 

1034 try: 

1035 self._packed_refs = None 

1036 self.get_packed_refs() 

1037 

1038 if self._packed_refs is None or name not in self._packed_refs: 

1039 f.abort() 

1040 return 

1041 

1042 del self._packed_refs[name] 

1043 if self._peeled_refs is not None: 

1044 with suppress(KeyError): 

1045 del self._peeled_refs[name] 

1046 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1047 f.close() 

1048 except BaseException: 

1049 f.abort() 

1050 raise 

1051 

1052 def set_symbolic_ref( 

1053 self, 

1054 name: bytes, 

1055 other: bytes, 

1056 committer: Optional[bytes] = None, 

1057 timestamp: Optional[int] = None, 

1058 timezone: Optional[int] = None, 

1059 message: Optional[bytes] = None, 

1060 ) -> None: 

1061 """Make a ref point at another ref. 

1062 

1063 Args: 

1064 name: Name of the ref to set 

1065 other: Name of the ref to point at 

1066 committer: Optional committer name 

1067 timestamp: Optional timestamp 

1068 timezone: Optional timezone 

1069 message: Optional message to describe the change 

1070 """ 

1071 self._check_refname(name) 

1072 self._check_refname(other) 

1073 filename = self.refpath(name) 

1074 f = GitFile(filename, "wb") 

1075 try: 

1076 f.write(SYMREF + other + b"\n") 

1077 sha = self.follow(name)[-1] 

1078 self._log( 

1079 name, 

1080 sha, 

1081 sha, 

1082 committer=committer, 

1083 timestamp=timestamp, 

1084 timezone=timezone, 

1085 message=message, 

1086 ) 

1087 except BaseException: 

1088 f.abort() 

1089 raise 

1090 else: 

1091 f.close() 

1092 

1093 def set_if_equals( 

1094 self, 

1095 name: bytes, 

1096 old_ref: Optional[bytes], 

1097 new_ref: bytes, 

1098 committer: Optional[bytes] = None, 

1099 timestamp: Optional[int] = None, 

1100 timezone: Optional[int] = None, 

1101 message: Optional[bytes] = None, 

1102 ) -> bool: 

1103 """Set a refname to new_ref only if it currently equals old_ref. 

1104 

1105 This method follows all symbolic references, and can be used to perform 

1106 an atomic compare-and-swap operation. 

1107 

1108 Args: 

1109 name: The refname to set. 

1110 old_ref: The old sha the refname must refer to, or None to set 

1111 unconditionally. 

1112 new_ref: The new sha the refname will refer to. 

1113 committer: Optional committer name 

1114 timestamp: Optional timestamp 

1115 timezone: Optional timezone 

1116 message: Set message for reflog 

1117 Returns: True if the set was successful, False otherwise. 

1118 """ 

1119 self._check_refname(name) 

1120 try: 

1121 realnames, _ = self.follow(name) 

1122 realname = realnames[-1] 

1123 except (KeyError, IndexError, SymrefLoop): 

1124 realname = name 

1125 filename = self.refpath(realname) 

1126 

1127 # make sure none of the ancestor folders is in packed refs 

1128 probe_ref = os.path.dirname(realname) 

1129 packed_refs = self.get_packed_refs() 

1130 while probe_ref: 

1131 if packed_refs.get(probe_ref, None) is not None: 

1132 raise NotADirectoryError(filename) 

1133 probe_ref = os.path.dirname(probe_ref) 

1134 

1135 ensure_dir_exists(os.path.dirname(filename)) 

1136 with GitFile(filename, "wb") as f: 

1137 if old_ref is not None: 

1138 try: 

1139 # read again while holding the lock to handle race conditions 

1140 orig_ref = self.read_loose_ref(realname) 

1141 if orig_ref is None: 

1142 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1143 if orig_ref != old_ref: 

1144 f.abort() 

1145 return False 

1146 except OSError: 

1147 f.abort() 

1148 raise 

1149 

1150 # Check if ref already has the desired value while holding the lock 

1151 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1152 current_ref = self.read_loose_ref(realname) 

1153 if current_ref is None: 

1154 current_ref = packed_refs.get(realname, None) 

1155 

1156 if current_ref is not None and current_ref == new_ref: 

1157 # Ref already has desired value, abort write to avoid fsync 

1158 f.abort() 

1159 return True 

1160 

1161 try: 

1162 f.write(new_ref + b"\n") 

1163 except OSError: 

1164 f.abort() 

1165 raise 

1166 self._log( 

1167 realname, 

1168 old_ref, 

1169 new_ref, 

1170 committer=committer, 

1171 timestamp=timestamp, 

1172 timezone=timezone, 

1173 message=message, 

1174 ) 

1175 return True 

1176 

1177 def add_if_new( 

1178 self, 

1179 name: bytes, 

1180 ref: bytes, 

1181 committer: Optional[bytes] = None, 

1182 timestamp: Optional[int] = None, 

1183 timezone: Optional[int] = None, 

1184 message: Optional[bytes] = None, 

1185 ) -> bool: 

1186 """Add a new reference only if it does not already exist. 

1187 

1188 This method follows symrefs, and only ensures that the last ref in the 

1189 chain does not exist. 

1190 

1191 Args: 

1192 name: The refname to set. 

1193 ref: The new sha the refname will refer to. 

1194 committer: Optional committer name 

1195 timestamp: Optional timestamp 

1196 timezone: Optional timezone 

1197 message: Optional message for reflog 

1198 Returns: True if the add was successful, False otherwise. 

1199 """ 

1200 try: 

1201 realnames, contents = self.follow(name) 

1202 if contents is not None: 

1203 return False 

1204 realname = realnames[-1] 

1205 except (KeyError, IndexError): 

1206 realname = name 

1207 self._check_refname(realname) 

1208 filename = self.refpath(realname) 

1209 ensure_dir_exists(os.path.dirname(filename)) 

1210 with GitFile(filename, "wb") as f: 

1211 if os.path.exists(filename) or name in self.get_packed_refs(): 

1212 f.abort() 

1213 return False 

1214 try: 

1215 f.write(ref + b"\n") 

1216 except OSError: 

1217 f.abort() 

1218 raise 

1219 else: 

1220 self._log( 

1221 name, 

1222 None, 

1223 ref, 

1224 committer=committer, 

1225 timestamp=timestamp, 

1226 timezone=timezone, 

1227 message=message, 

1228 ) 

1229 return True 

1230 

1231 def remove_if_equals( 

1232 self, 

1233 name: bytes, 

1234 old_ref: Optional[bytes], 

1235 committer: Optional[bytes] = None, 

1236 timestamp: Optional[int] = None, 

1237 timezone: Optional[int] = None, 

1238 message: Optional[bytes] = None, 

1239 ) -> bool: 

1240 """Remove a refname only if it currently equals old_ref. 

1241 

1242 This method does not follow symbolic references. It can be used to 

1243 perform an atomic compare-and-delete operation. 

1244 

1245 Args: 

1246 name: The refname to delete. 

1247 old_ref: The old sha the refname must refer to, or None to 

1248 delete unconditionally. 

1249 committer: Optional committer name 

1250 timestamp: Optional timestamp 

1251 timezone: Optional timezone 

1252 message: Optional message 

1253 Returns: True if the delete was successful, False otherwise. 

1254 """ 

1255 self._check_refname(name) 

1256 filename = self.refpath(name) 

1257 ensure_dir_exists(os.path.dirname(filename)) 

1258 f = GitFile(filename, "wb") 

1259 try: 

1260 if old_ref is not None: 

1261 orig_ref = self.read_loose_ref(name) 

1262 if orig_ref is None: 

1263 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1264 if orig_ref != old_ref: 

1265 return False 

1266 

1267 # remove the reference file itself 

1268 try: 

1269 found = os.path.lexists(filename) 

1270 except OSError: 

1271 # may only be packed, or otherwise unstorable 

1272 found = False 

1273 

1274 if found: 

1275 os.remove(filename) 

1276 

1277 self._remove_packed_ref(name) 

1278 self._log( 

1279 name, 

1280 old_ref, 

1281 None, 

1282 committer=committer, 

1283 timestamp=timestamp, 

1284 timezone=timezone, 

1285 message=message, 

1286 ) 

1287 finally: 

1288 # never write, we just wanted the lock 

1289 f.abort() 

1290 

1291 # outside of the lock, clean-up any parent directory that might now 

1292 # be empty. this ensures that re-creating a reference of the same 

1293 # name of what was previously a directory works as expected 

1294 parent = name 

1295 while True: 

1296 try: 

1297 parent, _ = parent.rsplit(b"/", 1) 

1298 except ValueError: 

1299 break 

1300 

1301 if parent == b"refs": 

1302 break 

1303 parent_filename = self.refpath(parent) 

1304 try: 

1305 os.rmdir(parent_filename) 

1306 except OSError: 

1307 # this can be caused by the parent directory being 

1308 # removed by another process, being not empty, etc. 

1309 # in any case, this is non fatal because we already 

1310 # removed the reference, just ignore it 

1311 break 

1312 

1313 return True 

1314 

1315 def pack_refs(self, all: bool = False) -> None: 

1316 """Pack loose refs into packed-refs file. 

1317 

1318 Args: 

1319 all: If True, pack all refs. If False, only pack tags. 

1320 """ 

1321 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1322 for ref in self.allkeys(): 

1323 if ref == HEADREF: 

1324 # Never pack HEAD 

1325 continue 

1326 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1327 try: 

1328 sha = self[ref] 

1329 if sha: 

1330 refs_to_pack[ref] = sha 

1331 except KeyError: 

1332 # Broken ref, skip it 

1333 pass 

1334 

1335 if refs_to_pack: 

1336 self.add_packed_refs(refs_to_pack) 

1337 

1338 

1339def _split_ref_line(line: bytes) -> tuple[bytes, bytes]: 

1340 """Split a single ref line into a tuple of SHA1 and name.""" 

1341 fields = line.rstrip(b"\n\r").split(b" ") 

1342 if len(fields) != 2: 

1343 raise PackedRefsException(f"invalid ref line {line!r}") 

1344 sha, name = fields 

1345 if not valid_hexsha(sha): 

1346 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1347 if not check_ref_format(name): 

1348 raise PackedRefsException(f"invalid ref name {name!r}") 

1349 return (sha, name) 

1350 

1351 

1352def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]: 

1353 """Read a packed refs file. 

1354 

1355 Args: 

1356 f: file-like object to read from 

1357 Returns: Iterator over tuples with SHA1s and ref names. 

1358 """ 

1359 for line in f: 

1360 if line.startswith(b"#"): 

1361 # Comment 

1362 continue 

1363 if line.startswith(b"^"): 

1364 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1365 yield _split_ref_line(line) 

1366 

1367 

1368def read_packed_refs_with_peeled( 

1369 f: IO[bytes], 

1370) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]: 

1371 """Read a packed refs file including peeled refs. 

1372 

1373 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1374 with ref names, SHA1s, and peeled SHA1s (or None). 

1375 

1376 Args: 

1377 f: file-like object to read from, seek'ed to the second line 

1378 """ 

1379 last = None 

1380 for line in f: 

1381 if line[0] == b"#": 

1382 continue 

1383 line = line.rstrip(b"\r\n") 

1384 if line.startswith(b"^"): 

1385 if not last: 

1386 raise PackedRefsException("unexpected peeled ref line") 

1387 if not valid_hexsha(line[1:]): 

1388 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1389 sha, name = _split_ref_line(last) 

1390 last = None 

1391 yield (sha, name, line[1:]) 

1392 else: 

1393 if last: 

1394 sha, name = _split_ref_line(last) 

1395 yield (sha, name, None) 

1396 last = line 

1397 if last: 

1398 sha, name = _split_ref_line(last) 

1399 yield (sha, name, None) 

1400 

1401 

1402def write_packed_refs( 

1403 f: IO[bytes], 

1404 packed_refs: dict[bytes, bytes], 

1405 peeled_refs: Optional[dict[bytes, bytes]] = None, 

1406) -> None: 

1407 """Write a packed refs file. 

1408 

1409 Args: 

1410 f: empty file-like object to write to 

1411 packed_refs: dict of refname to sha of packed refs to write 

1412 peeled_refs: dict of refname to peeled value of sha 

1413 """ 

1414 if peeled_refs is None: 

1415 peeled_refs = {} 

1416 else: 

1417 f.write(b"# pack-refs with: peeled\n") 

1418 for refname in sorted(packed_refs.keys()): 

1419 f.write(git_line(packed_refs[refname], refname)) 

1420 if refname in peeled_refs: 

1421 f.write(b"^" + peeled_refs[refname] + b"\n") 

1422 

1423 

1424def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]: 

1425 """Read info/refs file. 

1426 

1427 Args: 

1428 f: File-like object to read from 

1429 

1430 Returns: 

1431 Dictionary mapping ref names to SHA1s 

1432 """ 

1433 ret = {} 

1434 for line in f.readlines(): 

1435 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1436 ret[name] = sha 

1437 return ret 

1438 

1439 

1440def write_info_refs( 

1441 refs: dict[bytes, bytes], store: ObjectContainer 

1442) -> Iterator[bytes]: 

1443 """Generate info refs.""" 

1444 # TODO: Avoid recursive import :( 

1445 from .object_store import peel_sha 

1446 

1447 for name, sha in sorted(refs.items()): 

1448 # get_refs() includes HEAD as a special case, but we don't want to 

1449 # advertise it 

1450 if name == HEADREF: 

1451 continue 

1452 try: 

1453 o = store[sha] 

1454 except KeyError: 

1455 continue 

1456 unpeeled, peeled = peel_sha(store, sha) 

1457 yield o.id + b"\t" + name + b"\n" 

1458 if o.id != peeled.id: 

1459 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1460 

1461 

1462def is_local_branch(x: bytes) -> bool: 

1463 """Check if a ref name is a local branch.""" 

1464 return x.startswith(LOCAL_BRANCH_PREFIX) 

1465 

1466 

1467T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]]) 

1468 

1469 

1470def strip_peeled_refs(refs: T) -> T: 

1471 """Remove all peeled refs.""" 

1472 return { 

1473 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1474 } 

1475 

1476 

1477def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]: 

1478 """Split peeled refs from regular refs.""" 

1479 peeled: dict[bytes, bytes] = {} 

1480 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)} 

1481 

1482 for ref, sha in refs.items(): 

1483 if ref.endswith(PEELED_TAG_SUFFIX): 

1484 # Only add to peeled dict if sha is not None 

1485 if sha is not None: 

1486 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1487 

1488 return regular, peeled 

1489 

1490 

1491def _set_origin_head( 

1492 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes] 

1493) -> None: 

1494 # set refs/remotes/origin/HEAD 

1495 origin_base = b"refs/remotes/" + origin + b"/" 

1496 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1497 origin_ref = origin_base + HEADREF 

1498 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1499 if target_ref in refs: 

1500 refs.set_symbolic_ref(origin_ref, target_ref) 

1501 

1502 

1503def _set_default_branch( 

1504 refs: RefsContainer, 

1505 origin: bytes, 

1506 origin_head: Optional[bytes], 

1507 branch: bytes, 

1508 ref_message: Optional[bytes], 

1509) -> bytes: 

1510 """Set the default branch.""" 

1511 origin_base = b"refs/remotes/" + origin + b"/" 

1512 if branch: 

1513 origin_ref = origin_base + branch 

1514 if origin_ref in refs: 

1515 local_ref = LOCAL_BRANCH_PREFIX + branch 

1516 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1517 head_ref = local_ref 

1518 elif LOCAL_TAG_PREFIX + branch in refs: 

1519 head_ref = LOCAL_TAG_PREFIX + branch 

1520 else: 

1521 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1522 elif origin_head: 

1523 head_ref = origin_head 

1524 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1525 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1526 else: 

1527 origin_ref = origin_head 

1528 try: 

1529 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1530 except KeyError: 

1531 pass 

1532 else: 

1533 raise ValueError("neither origin_head nor branch are provided") 

1534 return head_ref 

1535 

1536 

1537def _set_head( 

1538 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes] 

1539) -> Optional[bytes]: 

1540 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1541 # detach HEAD at specified tag 

1542 head = refs[head_ref] 

1543 if isinstance(head, Tag): 

1544 _cls, obj = head.object 

1545 head = obj.get_object(obj).id 

1546 del refs[HEADREF] 

1547 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1548 else: 

1549 # set HEAD to specific branch 

1550 try: 

1551 head = refs[head_ref] 

1552 refs.set_symbolic_ref(HEADREF, head_ref) 

1553 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1554 except KeyError: 

1555 head = None 

1556 return head 

1557 

1558 

1559def _import_remote_refs( 

1560 refs_container: RefsContainer, 

1561 remote_name: str, 

1562 refs: dict[bytes, Optional[bytes]], 

1563 message: Optional[bytes] = None, 

1564 prune: bool = False, 

1565 prune_tags: bool = False, 

1566) -> None: 

1567 stripped_refs = strip_peeled_refs(refs) 

1568 branches = { 

1569 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1570 for (n, v) in stripped_refs.items() 

1571 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None 

1572 } 

1573 refs_container.import_refs( 

1574 b"refs/remotes/" + remote_name.encode(), 

1575 branches, 

1576 message=message, 

1577 prune=prune, 

1578 ) 

1579 tags = { 

1580 n[len(LOCAL_TAG_PREFIX) :]: v 

1581 for (n, v) in stripped_refs.items() 

1582 if n.startswith(LOCAL_TAG_PREFIX) 

1583 and not n.endswith(PEELED_TAG_SUFFIX) 

1584 and v is not None 

1585 } 

1586 refs_container.import_refs( 

1587 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1588 ) 

1589 

1590 

1591def serialize_refs( 

1592 store: ObjectContainer, refs: dict[bytes, bytes] 

1593) -> dict[bytes, bytes]: 

1594 """Serialize refs with peeled refs. 

1595 

1596 Args: 

1597 store: Object store to peel refs from 

1598 refs: Dictionary of ref names to SHAs 

1599 

1600 Returns: 

1601 Dictionary with refs and peeled refs (marked with ^{}) 

1602 """ 

1603 # TODO: Avoid recursive import :( 

1604 from .object_store import peel_sha 

1605 

1606 ret = {} 

1607 for ref, sha in refs.items(): 

1608 try: 

1609 unpeeled, peeled = peel_sha(store, sha) 

1610 except KeyError: 

1611 warnings.warn( 

1612 "ref {} points at non-present sha {}".format( 

1613 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1614 ), 

1615 UserWarning, 

1616 ) 

1617 continue 

1618 else: 

1619 if isinstance(unpeeled, Tag): 

1620 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1621 ret[ref] = unpeeled.id 

1622 return ret 

1623 

1624 

1625class locked_ref: 

1626 """Lock a ref while making modifications. 

1627 

1628 Works as a context manager. 

1629 """ 

1630 

1631 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1632 """Initialize a locked ref. 

1633 

1634 Args: 

1635 refs_container: The DiskRefsContainer to lock the ref in 

1636 refname: The ref name to lock 

1637 """ 

1638 self._refs_container = refs_container 

1639 self._refname = refname 

1640 self._file: Optional[_GitFile] = None 

1641 self._realname: Optional[Ref] = None 

1642 self._deleted = False 

1643 

1644 def __enter__(self) -> "locked_ref": 

1645 """Enter the context manager and acquire the lock. 

1646 

1647 Returns: 

1648 This locked_ref instance 

1649 

1650 Raises: 

1651 OSError: If the lock cannot be acquired 

1652 """ 

1653 self._refs_container._check_refname(self._refname) 

1654 try: 

1655 realnames, _ = self._refs_container.follow(self._refname) 

1656 self._realname = realnames[-1] 

1657 except (KeyError, IndexError, SymrefLoop): 

1658 self._realname = self._refname 

1659 

1660 filename = self._refs_container.refpath(self._realname) 

1661 ensure_dir_exists(os.path.dirname(filename)) 

1662 f = GitFile(filename, "wb") 

1663 self._file = f 

1664 return self 

1665 

1666 def __exit__( 

1667 self, 

1668 exc_type: Optional[type], 

1669 exc_value: Optional[BaseException], 

1670 traceback: Optional[types.TracebackType], 

1671 ) -> None: 

1672 """Exit the context manager and release the lock. 

1673 

1674 Args: 

1675 exc_type: Type of exception if one occurred 

1676 exc_value: Exception instance if one occurred 

1677 traceback: Traceback if an exception occurred 

1678 """ 

1679 if self._file: 

1680 if exc_type is not None or self._deleted: 

1681 self._file.abort() 

1682 else: 

1683 self._file.close() 

1684 

1685 def get(self) -> Optional[bytes]: 

1686 """Get the current value of the ref.""" 

1687 if not self._file: 

1688 raise RuntimeError("locked_ref not in context") 

1689 

1690 assert self._realname is not None 

1691 current_ref = self._refs_container.read_loose_ref(self._realname) 

1692 if current_ref is None: 

1693 current_ref = self._refs_container.get_packed_refs().get( 

1694 self._realname, None 

1695 ) 

1696 return current_ref 

1697 

1698 def ensure_equals(self, expected_value: Optional[bytes]) -> bool: 

1699 """Ensure the ref currently equals the expected value. 

1700 

1701 Args: 

1702 expected_value: The expected current value of the ref 

1703 Returns: 

1704 True if the ref equals the expected value, False otherwise 

1705 """ 

1706 current_value = self.get() 

1707 return current_value == expected_value 

1708 

1709 def set(self, new_ref: bytes) -> None: 

1710 """Set the ref to a new value. 

1711 

1712 Args: 

1713 new_ref: The new SHA1 or symbolic ref value 

1714 """ 

1715 if not self._file: 

1716 raise RuntimeError("locked_ref not in context") 

1717 

1718 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1719 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1720 

1721 self._file.seek(0) 

1722 self._file.truncate() 

1723 self._file.write(new_ref + b"\n") 

1724 self._deleted = False 

1725 

1726 def set_symbolic_ref(self, target: Ref) -> None: 

1727 """Make this ref point at another ref. 

1728 

1729 Args: 

1730 target: Name of the ref to point at 

1731 """ 

1732 if not self._file: 

1733 raise RuntimeError("locked_ref not in context") 

1734 

1735 self._refs_container._check_refname(target) 

1736 self._file.seek(0) 

1737 self._file.truncate() 

1738 self._file.write(SYMREF + target + b"\n") 

1739 self._deleted = False 

1740 

1741 def delete(self) -> None: 

1742 """Delete the ref file while holding the lock.""" 

1743 if not self._file: 

1744 raise RuntimeError("locked_ref not in context") 

1745 

1746 # Delete the actual ref file while holding the lock 

1747 if self._realname: 

1748 filename = self._refs_container.refpath(self._realname) 

1749 try: 

1750 if os.path.lexists(filename): 

1751 os.remove(filename) 

1752 except FileNotFoundError: 

1753 pass 

1754 self._refs_container._remove_packed_ref(self._realname) 

1755 

1756 self._deleted = True 

1757 

1758 

1759def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

1760 """Filter refs to only include those with a given prefix. 

1761 

1762 Args: 

1763 refs: A dictionary of refs. 

1764 prefixes: The prefixes to filter by. 

1765 """ 

1766 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

1767 return cast(T, filtered) 

1768 

1769 

1770def is_per_worktree_ref(ref: bytes) -> bool: 

1771 """Returns whether a reference is stored per worktree or not. 

1772 

1773 Per-worktree references are: 

1774 - all pseudorefs, e.g. HEAD 

1775 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

1776 

1777 All refs starting with "refs/" are shared, except for the ones listed above. 

1778 

1779 See https://git-scm.com/docs/git-worktree#_refs. 

1780 """ 

1781 return not ref.startswith(b"refs/") or ref.startswith( 

1782 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

1783 )