Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

797 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import types 

27import warnings 

28from collections.abc import Iterable, Iterator, Mapping 

29from contextlib import suppress 

30from typing import ( 

31 IO, 

32 TYPE_CHECKING, 

33 Any, 

34 BinaryIO, 

35 Callable, 

36 Optional, 

37 TypeVar, 

38 Union, 

39) 

40 

41if TYPE_CHECKING: 

42 from .file import _GitFile 

43 

44from .errors import PackedRefsException, RefFormatError 

45from .file import GitFile, ensure_dir_exists 

46from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

47from .pack import ObjectContainer 

48 

49Ref = bytes 

50 

51HEADREF = b"HEAD" 

52SYMREF = b"ref: " 

53LOCAL_BRANCH_PREFIX = b"refs/heads/" 

54LOCAL_TAG_PREFIX = b"refs/tags/" 

55LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

56LOCAL_NOTES_PREFIX = b"refs/notes/" 

57LOCAL_REPLACE_PREFIX = b"refs/replace/" 

58BAD_REF_CHARS = set(b"\177 ~^:?*[") 

59PEELED_TAG_SUFFIX = b"^{}" 

60 

61# For backwards compatibility 

62ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

63 

64 

65class SymrefLoop(Exception): 

66 """There is a loop between one or more symrefs.""" 

67 

68 def __init__(self, ref: bytes, depth: int) -> None: 

69 """Initialize SymrefLoop exception.""" 

70 self.ref = ref 

71 self.depth = depth 

72 

73 

74def parse_symref_value(contents: bytes) -> bytes: 

75 """Parse a symref value. 

76 

77 Args: 

78 contents: Contents to parse 

79 Returns: Destination 

80 """ 

81 if contents.startswith(SYMREF): 

82 return contents[len(SYMREF) :].rstrip(b"\r\n") 

83 raise ValueError(contents) 

84 

85 

86def check_ref_format(refname: Ref) -> bool: 

87 """Check if a refname is correctly formatted. 

88 

89 Implements all the same rules as git-check-ref-format[1]. 

90 

91 [1] 

92 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

93 

94 Args: 

95 refname: The refname to check 

96 Returns: True if refname is valid, False otherwise 

97 """ 

98 # These could be combined into one big expression, but are listed 

99 # separately to parallel [1]. 

100 if b"/." in refname or refname.startswith(b"."): 

101 return False 

102 if b"/" not in refname: 

103 return False 

104 if b".." in refname: 

105 return False 

106 for i, c in enumerate(refname): 

107 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

108 return False 

109 if refname[-1] in b"/.": 

110 return False 

111 if refname.endswith(b".lock"): 

112 return False 

113 if b"@{" in refname: 

114 return False 

115 if b"\\" in refname: 

116 return False 

117 return True 

118 

119 

120def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

121 """Parse a remote ref into remote name and branch name. 

122 

123 Args: 

124 ref: Remote ref like b"refs/remotes/origin/main" 

125 

126 Returns: 

127 Tuple of (remote_name, branch_name) 

128 

129 Raises: 

130 ValueError: If ref is not a valid remote ref 

131 """ 

132 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

133 raise ValueError(f"Not a remote ref: {ref!r}") 

134 

135 # Remove the prefix 

136 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

137 

138 # Split into remote name and branch name 

139 parts = remainder.split(b"/", 1) 

140 if len(parts) != 2: 

141 raise ValueError(f"Invalid remote ref format: {ref!r}") 

142 

143 remote_name, branch_name = parts 

144 return (remote_name, branch_name) 

145 

146 

147class RefsContainer: 

148 """A container for refs.""" 

149 

150 def __init__( 

151 self, 

152 logger: Optional[ 

153 Callable[ 

154 [ 

155 bytes, 

156 bytes, 

157 bytes, 

158 Optional[bytes], 

159 Optional[int], 

160 Optional[int], 

161 bytes, 

162 ], 

163 None, 

164 ] 

165 ] = None, 

166 ) -> None: 

167 """Initialize RefsContainer with optional logger function.""" 

168 self._logger = logger 

169 

170 def _log( 

171 self, 

172 ref: bytes, 

173 old_sha: Optional[bytes], 

174 new_sha: Optional[bytes], 

175 committer: Optional[bytes] = None, 

176 timestamp: Optional[int] = None, 

177 timezone: Optional[int] = None, 

178 message: Optional[bytes] = None, 

179 ) -> None: 

180 if self._logger is None: 

181 return 

182 if message is None: 

183 return 

184 # Use ZERO_SHA for None values, matching git behavior 

185 if old_sha is None: 

186 old_sha = ZERO_SHA 

187 if new_sha is None: 

188 new_sha = ZERO_SHA 

189 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

190 

191 def set_symbolic_ref( 

192 self, 

193 name: bytes, 

194 other: bytes, 

195 committer: Optional[bytes] = None, 

196 timestamp: Optional[int] = None, 

197 timezone: Optional[int] = None, 

198 message: Optional[bytes] = None, 

199 ) -> None: 

200 """Make a ref point at another ref. 

201 

202 Args: 

203 name: Name of the ref to set 

204 other: Name of the ref to point at 

205 committer: Optional committer name/email 

206 timestamp: Optional timestamp 

207 timezone: Optional timezone 

208 message: Optional message 

209 """ 

210 raise NotImplementedError(self.set_symbolic_ref) 

211 

212 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

213 """Get contents of the packed-refs file. 

214 

215 Returns: Dictionary mapping ref names to SHA1s 

216 

217 Note: Will return an empty dictionary when no packed-refs file is 

218 present. 

219 """ 

220 raise NotImplementedError(self.get_packed_refs) 

221 

222 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None: 

223 """Add the given refs as packed refs. 

224 

225 Args: 

226 new_refs: A mapping of ref names to targets; if a target is None that 

227 means remove the ref 

228 """ 

229 raise NotImplementedError(self.add_packed_refs) 

230 

231 def get_peeled(self, name: bytes) -> Optional[ObjectID]: 

232 """Return the cached peeled value of a ref, if available. 

233 

234 Args: 

235 name: Name of the ref to peel 

236 Returns: The peeled value of the ref. If the ref is known not point to 

237 a tag, this will be the SHA the ref refers to. If the ref may point 

238 to a tag, but no cached information is available, None is returned. 

239 """ 

240 return None 

241 

242 def import_refs( 

243 self, 

244 base: Ref, 

245 other: Mapping[Ref, ObjectID], 

246 committer: Optional[bytes] = None, 

247 timestamp: Optional[bytes] = None, 

248 timezone: Optional[bytes] = None, 

249 message: Optional[bytes] = None, 

250 prune: bool = False, 

251 ) -> None: 

252 """Import refs from another repository. 

253 

254 Args: 

255 base: Base ref to import into (e.g., b'refs/remotes/origin') 

256 other: Dictionary of refs to import 

257 committer: Optional committer for reflog 

258 timestamp: Optional timestamp for reflog 

259 timezone: Optional timezone for reflog 

260 message: Optional message for reflog 

261 prune: If True, remove refs not in other 

262 """ 

263 if prune: 

264 to_delete = set(self.subkeys(base)) 

265 else: 

266 to_delete = set() 

267 for name, value in other.items(): 

268 if value is None: 

269 to_delete.add(name) 

270 else: 

271 self.set_if_equals( 

272 b"/".join((base, name)), None, value, message=message 

273 ) 

274 if to_delete: 

275 try: 

276 to_delete.remove(name) 

277 except KeyError: 

278 pass 

279 for ref in to_delete: 

280 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

281 

282 def allkeys(self) -> set[Ref]: 

283 """All refs present in this container.""" 

284 raise NotImplementedError(self.allkeys) 

285 

286 def __iter__(self) -> Iterator[Ref]: 

287 """Iterate over all reference keys.""" 

288 return iter(self.allkeys()) 

289 

290 def keys(self, base: Optional[bytes] = None) -> set[bytes]: 

291 """Refs present in this container. 

292 

293 Args: 

294 base: An optional base to return refs under. 

295 Returns: An unsorted set of valid refs in this container, including 

296 packed refs. 

297 """ 

298 if base is not None: 

299 return self.subkeys(base) 

300 else: 

301 return self.allkeys() 

302 

303 def subkeys(self, base: bytes) -> set[bytes]: 

304 """Refs present in this container under a base. 

305 

306 Args: 

307 base: The base to return refs under. 

308 Returns: A set of valid refs in this container under the base; the base 

309 prefix is stripped from the ref names returned. 

310 """ 

311 keys = set() 

312 base_len = len(base) + 1 

313 for refname in self.allkeys(): 

314 if refname.startswith(base): 

315 keys.add(refname[base_len:]) 

316 return keys 

317 

318 def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]: 

319 """Return the contents of this container as a dictionary.""" 

320 ret = {} 

321 keys = self.keys(base) 

322 if base is None: 

323 base = b"" 

324 else: 

325 base = base.rstrip(b"/") 

326 for key in keys: 

327 try: 

328 ret[key] = self[(base + b"/" + key).strip(b"/")] 

329 except (SymrefLoop, KeyError): 

330 continue # Unable to resolve 

331 

332 return ret 

333 

334 def _check_refname(self, name: bytes) -> None: 

335 """Ensure a refname is valid and lives in refs or is HEAD. 

336 

337 HEAD is not a valid refname according to git-check-ref-format, but this 

338 class needs to be able to touch HEAD. Also, check_ref_format expects 

339 refnames without the leading 'refs/', but this class requires that 

340 so it cannot touch anything outside the refs dir (or HEAD). 

341 

342 Args: 

343 name: The name of the reference. 

344 

345 Raises: 

346 KeyError: if a refname is not HEAD or is otherwise not valid. 

347 """ 

348 if name in (HEADREF, b"refs/stash"): 

349 return 

350 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

351 raise RefFormatError(name) 

352 

353 def read_ref(self, refname: bytes) -> Optional[bytes]: 

354 """Read a reference without following any references. 

355 

356 Args: 

357 refname: The name of the reference 

358 Returns: The contents of the ref file, or None if it does 

359 not exist. 

360 """ 

361 contents = self.read_loose_ref(refname) 

362 if not contents: 

363 contents = self.get_packed_refs().get(refname, None) 

364 return contents 

365 

366 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

367 """Read a loose reference and return its contents. 

368 

369 Args: 

370 name: the refname to read 

371 Returns: The contents of the ref file, or None if it does 

372 not exist. 

373 """ 

374 raise NotImplementedError(self.read_loose_ref) 

375 

376 def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]: 

377 """Follow a reference name. 

378 

379 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

380 references in the chain 

381 """ 

382 contents: Optional[bytes] = SYMREF + name 

383 depth = 0 

384 refnames = [] 

385 while contents and contents.startswith(SYMREF): 

386 refname = contents[len(SYMREF) :] 

387 refnames.append(refname) 

388 contents = self.read_ref(refname) 

389 if not contents: 

390 break 

391 depth += 1 

392 if depth > 5: 

393 raise SymrefLoop(name, depth) 

394 return refnames, contents 

395 

396 def __contains__(self, refname: bytes) -> bool: 

397 """Check if a reference exists.""" 

398 if self.read_ref(refname): 

399 return True 

400 return False 

401 

402 def __getitem__(self, name: bytes) -> ObjectID: 

403 """Get the SHA1 for a reference name. 

404 

405 This method follows all symbolic references. 

406 """ 

407 _, sha = self.follow(name) 

408 if sha is None: 

409 raise KeyError(name) 

410 return sha 

411 

412 def set_if_equals( 

413 self, 

414 name: bytes, 

415 old_ref: Optional[bytes], 

416 new_ref: bytes, 

417 committer: Optional[bytes] = None, 

418 timestamp: Optional[int] = None, 

419 timezone: Optional[int] = None, 

420 message: Optional[bytes] = None, 

421 ) -> bool: 

422 """Set a refname to new_ref only if it currently equals old_ref. 

423 

424 This method follows all symbolic references if applicable for the 

425 subclass, and can be used to perform an atomic compare-and-swap 

426 operation. 

427 

428 Args: 

429 name: The refname to set. 

430 old_ref: The old sha the refname must refer to, or None to set 

431 unconditionally. 

432 new_ref: The new sha the refname will refer to. 

433 committer: Optional committer name/email 

434 timestamp: Optional timestamp 

435 timezone: Optional timezone 

436 message: Message for reflog 

437 Returns: True if the set was successful, False otherwise. 

438 """ 

439 raise NotImplementedError(self.set_if_equals) 

440 

441 def add_if_new( 

442 self, 

443 name: bytes, 

444 ref: bytes, 

445 committer: Optional[bytes] = None, 

446 timestamp: Optional[int] = None, 

447 timezone: Optional[int] = None, 

448 message: Optional[bytes] = None, 

449 ) -> bool: 

450 """Add a new reference only if it does not already exist. 

451 

452 Args: 

453 name: Ref name 

454 ref: Ref value 

455 committer: Optional committer name/email 

456 timestamp: Optional timestamp 

457 timezone: Optional timezone 

458 message: Optional message for reflog 

459 """ 

460 raise NotImplementedError(self.add_if_new) 

461 

462 def __setitem__(self, name: bytes, ref: bytes) -> None: 

463 """Set a reference name to point to the given SHA1. 

464 

465 This method follows all symbolic references if applicable for the 

466 subclass. 

467 

468 Note: This method unconditionally overwrites the contents of a 

469 reference. To update atomically only if the reference has not 

470 changed, use set_if_equals(). 

471 

472 Args: 

473 name: The refname to set. 

474 ref: The new sha the refname will refer to. 

475 """ 

476 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

477 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

478 self.set_if_equals(name, None, ref) 

479 

480 def remove_if_equals( 

481 self, 

482 name: bytes, 

483 old_ref: Optional[bytes], 

484 committer: Optional[bytes] = None, 

485 timestamp: Optional[int] = None, 

486 timezone: Optional[int] = None, 

487 message: Optional[bytes] = None, 

488 ) -> bool: 

489 """Remove a refname only if it currently equals old_ref. 

490 

491 This method does not follow symbolic references, even if applicable for 

492 the subclass. It can be used to perform an atomic compare-and-delete 

493 operation. 

494 

495 Args: 

496 name: The refname to delete. 

497 old_ref: The old sha the refname must refer to, or None to 

498 delete unconditionally. 

499 committer: Optional committer name/email 

500 timestamp: Optional timestamp 

501 timezone: Optional timezone 

502 message: Message for reflog 

503 Returns: True if the delete was successful, False otherwise. 

504 """ 

505 raise NotImplementedError(self.remove_if_equals) 

506 

507 def __delitem__(self, name: bytes) -> None: 

508 """Remove a refname. 

509 

510 This method does not follow symbolic references, even if applicable for 

511 the subclass. 

512 

513 Note: This method unconditionally deletes the contents of a reference. 

514 To delete atomically only if the reference has not changed, use 

515 remove_if_equals(). 

516 

517 Args: 

518 name: The refname to delete. 

519 """ 

520 self.remove_if_equals(name, None) 

521 

522 def get_symrefs(self) -> dict[bytes, bytes]: 

523 """Get a dict with all symrefs in this container. 

524 

525 Returns: Dictionary mapping source ref to target ref 

526 """ 

527 ret = {} 

528 for src in self.allkeys(): 

529 try: 

530 ref_value = self.read_ref(src) 

531 assert ref_value is not None 

532 dst = parse_symref_value(ref_value) 

533 except ValueError: 

534 pass 

535 else: 

536 ret[src] = dst 

537 return ret 

538 

539 def pack_refs(self, all: bool = False) -> None: 

540 """Pack loose refs into packed-refs file. 

541 

542 Args: 

543 all: If True, pack all refs. If False, only pack tags. 

544 """ 

545 raise NotImplementedError(self.pack_refs) 

546 

547 

548class DictRefsContainer(RefsContainer): 

549 """RefsContainer backed by a simple dict. 

550 

551 This container does not support symbolic or packed references and is not 

552 threadsafe. 

553 """ 

554 

555 def __init__( 

556 self, 

557 refs: dict[bytes, bytes], 

558 logger: Optional[ 

559 Callable[ 

560 [ 

561 bytes, 

562 Optional[bytes], 

563 Optional[bytes], 

564 Optional[bytes], 

565 Optional[int], 

566 Optional[int], 

567 Optional[bytes], 

568 ], 

569 None, 

570 ] 

571 ] = None, 

572 ) -> None: 

573 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

574 super().__init__(logger=logger) 

575 self._refs = refs 

576 self._peeled: dict[bytes, ObjectID] = {} 

577 self._watchers: set[Any] = set() 

578 

579 def allkeys(self) -> set[bytes]: 

580 """Return all reference keys.""" 

581 return set(self._refs.keys()) 

582 

583 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

584 """Read a loose reference.""" 

585 return self._refs.get(name, None) 

586 

587 def get_packed_refs(self) -> dict[bytes, bytes]: 

588 """Get packed references.""" 

589 return {} 

590 

591 def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None: 

592 for watcher in self._watchers: 

593 watcher._notify((ref, newsha)) 

594 

595 def set_symbolic_ref( 

596 self, 

597 name: Ref, 

598 other: Ref, 

599 committer: Optional[bytes] = None, 

600 timestamp: Optional[int] = None, 

601 timezone: Optional[int] = None, 

602 message: Optional[bytes] = None, 

603 ) -> None: 

604 """Make a ref point at another ref. 

605 

606 Args: 

607 name: Name of the ref to set 

608 other: Name of the ref to point at 

609 committer: Optional committer name for reflog 

610 timestamp: Optional timestamp for reflog 

611 timezone: Optional timezone for reflog 

612 message: Optional message for reflog 

613 """ 

614 old = self.follow(name)[-1] 

615 new = SYMREF + other 

616 self._refs[name] = new 

617 self._notify(name, new) 

618 self._log( 

619 name, 

620 old, 

621 new, 

622 committer=committer, 

623 timestamp=timestamp, 

624 timezone=timezone, 

625 message=message, 

626 ) 

627 

628 def set_if_equals( 

629 self, 

630 name: bytes, 

631 old_ref: Optional[bytes], 

632 new_ref: bytes, 

633 committer: Optional[bytes] = None, 

634 timestamp: Optional[int] = None, 

635 timezone: Optional[int] = None, 

636 message: Optional[bytes] = None, 

637 ) -> bool: 

638 """Set a refname to new_ref only if it currently equals old_ref. 

639 

640 This method follows all symbolic references, and can be used to perform 

641 an atomic compare-and-swap operation. 

642 

643 Args: 

644 name: The refname to set. 

645 old_ref: The old sha the refname must refer to, or None to set 

646 unconditionally. 

647 new_ref: The new sha the refname will refer to. 

648 committer: Optional committer name for reflog 

649 timestamp: Optional timestamp for reflog 

650 timezone: Optional timezone for reflog 

651 message: Optional message for reflog 

652 

653 Returns: 

654 True if the set was successful, False otherwise. 

655 """ 

656 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

657 return False 

658 # Only update the specific ref requested, not the whole chain 

659 self._check_refname(name) 

660 old = self._refs.get(name) 

661 self._refs[name] = new_ref 

662 self._notify(name, new_ref) 

663 self._log( 

664 name, 

665 old, 

666 new_ref, 

667 committer=committer, 

668 timestamp=timestamp, 

669 timezone=timezone, 

670 message=message, 

671 ) 

672 return True 

673 

674 def add_if_new( 

675 self, 

676 name: Ref, 

677 ref: ObjectID, 

678 committer: Optional[bytes] = None, 

679 timestamp: Optional[int] = None, 

680 timezone: Optional[int] = None, 

681 message: Optional[bytes] = None, 

682 ) -> bool: 

683 """Add a new reference only if it does not already exist. 

684 

685 Args: 

686 name: Ref name 

687 ref: Ref value 

688 committer: Optional committer name for reflog 

689 timestamp: Optional timestamp for reflog 

690 timezone: Optional timezone for reflog 

691 message: Optional message for reflog 

692 

693 Returns: 

694 True if the add was successful, False otherwise. 

695 """ 

696 if name in self._refs: 

697 return False 

698 self._refs[name] = ref 

699 self._notify(name, ref) 

700 self._log( 

701 name, 

702 None, 

703 ref, 

704 committer=committer, 

705 timestamp=timestamp, 

706 timezone=timezone, 

707 message=message, 

708 ) 

709 return True 

710 

711 def remove_if_equals( 

712 self, 

713 name: bytes, 

714 old_ref: Optional[bytes], 

715 committer: Optional[bytes] = None, 

716 timestamp: Optional[int] = None, 

717 timezone: Optional[int] = None, 

718 message: Optional[bytes] = None, 

719 ) -> bool: 

720 """Remove a refname only if it currently equals old_ref. 

721 

722 This method does not follow symbolic references. It can be used to 

723 perform an atomic compare-and-delete operation. 

724 

725 Args: 

726 name: The refname to delete. 

727 old_ref: The old sha the refname must refer to, or None to 

728 delete unconditionally. 

729 committer: Optional committer name for reflog 

730 timestamp: Optional timestamp for reflog 

731 timezone: Optional timezone for reflog 

732 message: Optional message for reflog 

733 

734 Returns: 

735 True if the delete was successful, False otherwise. 

736 """ 

737 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

738 return False 

739 try: 

740 old = self._refs.pop(name) 

741 except KeyError: 

742 pass 

743 else: 

744 self._notify(name, None) 

745 self._log( 

746 name, 

747 old, 

748 None, 

749 committer=committer, 

750 timestamp=timestamp, 

751 timezone=timezone, 

752 message=message, 

753 ) 

754 return True 

755 

756 def get_peeled(self, name: bytes) -> Optional[bytes]: 

757 """Get peeled version of a reference.""" 

758 return self._peeled.get(name) 

759 

760 def _update(self, refs: Mapping[bytes, bytes]) -> None: 

761 """Update multiple refs; intended only for testing.""" 

762 # TODO(dborowitz): replace this with a public function that uses 

763 # set_if_equal. 

764 for ref, sha in refs.items(): 

765 self.set_if_equals(ref, None, sha) 

766 

767 def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None: 

768 """Update cached peeled refs; intended only for testing.""" 

769 self._peeled.update(peeled) 

770 

771 

772class InfoRefsContainer(RefsContainer): 

773 """Refs container that reads refs from a info/refs file.""" 

774 

775 def __init__(self, f: BinaryIO) -> None: 

776 """Initialize InfoRefsContainer from info/refs file.""" 

777 self._refs: dict[bytes, bytes] = {} 

778 self._peeled: dict[bytes, bytes] = {} 

779 refs = read_info_refs(f) 

780 (self._refs, self._peeled) = split_peeled_refs(refs) 

781 

782 def allkeys(self) -> set[bytes]: 

783 """Return all reference keys.""" 

784 return set(self._refs.keys()) 

785 

786 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

787 """Read a loose reference.""" 

788 return self._refs.get(name, None) 

789 

790 def get_packed_refs(self) -> dict[bytes, bytes]: 

791 """Get packed references.""" 

792 return {} 

793 

794 def get_peeled(self, name: bytes) -> Optional[bytes]: 

795 """Get peeled version of a reference.""" 

796 try: 

797 return self._peeled[name] 

798 except KeyError: 

799 return self._refs[name] 

800 

801 

802class DiskRefsContainer(RefsContainer): 

803 """Refs container that reads refs from disk.""" 

804 

805 def __init__( 

806 self, 

807 path: Union[str, bytes, os.PathLike[str]], 

808 worktree_path: Optional[Union[str, bytes, os.PathLike[str]]] = None, 

809 logger: Optional[ 

810 Callable[ 

811 [ 

812 bytes, 

813 bytes, 

814 bytes, 

815 Optional[bytes], 

816 Optional[int], 

817 Optional[int], 

818 bytes, 

819 ], 

820 None, 

821 ] 

822 ] = None, 

823 ) -> None: 

824 """Initialize DiskRefsContainer.""" 

825 super().__init__(logger=logger) 

826 # Convert path-like objects to strings, then to bytes for Git compatibility 

827 self.path = os.fsencode(os.fspath(path)) 

828 if worktree_path is None: 

829 self.worktree_path = self.path 

830 else: 

831 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

832 self._packed_refs: Optional[dict[bytes, bytes]] = None 

833 self._peeled_refs: Optional[dict[bytes, bytes]] = None 

834 

835 def __repr__(self) -> str: 

836 """Return string representation of DiskRefsContainer.""" 

837 return f"{self.__class__.__name__}({self.path!r})" 

838 

839 def _iter_dir( 

840 self, 

841 path: bytes, 

842 base: bytes, 

843 dir_filter: Optional[Callable[[bytes], bool]] = None, 

844 ) -> Iterator[bytes]: 

845 refspath = os.path.join(path, base.rstrip(b"/")) 

846 prefix_len = len(os.path.join(path, b"")) 

847 

848 for root, dirs, files in os.walk(refspath): 

849 directory = root[prefix_len:] 

850 if os.path.sep != "/": 

851 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

852 if dir_filter is not None: 

853 dirs[:] = [ 

854 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

855 ] 

856 

857 for filename in files: 

858 refname = b"/".join([directory, filename]) 

859 if check_ref_format(refname): 

860 yield refname 

861 

862 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]: 

863 base = base.rstrip(b"/") + b"/" 

864 search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = [] 

865 if base != b"refs/": 

866 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

867 search_paths.append((path, None)) 

868 elif self.worktree_path == self.path: 

869 # Iterate through all the refs from the main worktree 

870 search_paths.append((self.path, None)) 

871 else: 

872 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

873 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

874 # Iterate through all the per-worktree refs from the worktree's gitdir 

875 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

876 

877 for path, dir_filter in search_paths: 

878 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

879 

880 def subkeys(self, base: bytes) -> set[bytes]: 

881 """Return subkeys under a given base reference path.""" 

882 subkeys = set() 

883 

884 for key in self._iter_loose_refs(base): 

885 if key.startswith(base): 

886 subkeys.add(key[len(base) :].strip(b"/")) 

887 

888 for key in self.get_packed_refs(): 

889 if key.startswith(base): 

890 subkeys.add(key[len(base) :].strip(b"/")) 

891 return subkeys 

892 

893 def allkeys(self) -> set[bytes]: 

894 """Return all reference keys.""" 

895 allkeys = set() 

896 if os.path.exists(self.refpath(HEADREF)): 

897 allkeys.add(HEADREF) 

898 

899 allkeys.update(self._iter_loose_refs()) 

900 allkeys.update(self.get_packed_refs()) 

901 return allkeys 

902 

903 def refpath(self, name: bytes) -> bytes: 

904 """Return the disk path of a ref.""" 

905 path = name 

906 if os.path.sep != "/": 

907 path = path.replace(b"/", os.fsencode(os.path.sep)) 

908 

909 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

910 return os.path.join(root_dir, path) 

911 

912 def get_packed_refs(self) -> dict[bytes, bytes]: 

913 """Get contents of the packed-refs file. 

914 

915 Returns: Dictionary mapping ref names to SHA1s 

916 

917 Note: Will return an empty dictionary when no packed-refs file is 

918 present. 

919 """ 

920 # TODO: invalidate the cache on repacking 

921 if self._packed_refs is None: 

922 # set both to empty because we want _peeled_refs to be 

923 # None if and only if _packed_refs is also None. 

924 self._packed_refs = {} 

925 self._peeled_refs = {} 

926 path = os.path.join(self.path, b"packed-refs") 

927 try: 

928 f = GitFile(path, "rb") 

929 except FileNotFoundError: 

930 return {} 

931 with f: 

932 first_line = next(iter(f)).rstrip() 

933 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

934 for sha, name, peeled in read_packed_refs_with_peeled(f): 

935 self._packed_refs[name] = sha 

936 if peeled: 

937 self._peeled_refs[name] = peeled 

938 else: 

939 f.seek(0) 

940 for sha, name in read_packed_refs(f): 

941 self._packed_refs[name] = sha 

942 return self._packed_refs 

943 

944 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None: 

945 """Add the given refs as packed refs. 

946 

947 Args: 

948 new_refs: A mapping of ref names to targets; if a target is None that 

949 means remove the ref 

950 """ 

951 if not new_refs: 

952 return 

953 

954 path = os.path.join(self.path, b"packed-refs") 

955 

956 with GitFile(path, "wb") as f: 

957 # reread cached refs from disk, while holding the lock 

958 packed_refs = self.get_packed_refs().copy() 

959 

960 for ref, target in new_refs.items(): 

961 # sanity check 

962 if ref == HEADREF: 

963 raise ValueError("cannot pack HEAD") 

964 

965 # remove any loose refs pointing to this one -- please 

966 # note that this bypasses remove_if_equals as we don't 

967 # want to affect packed refs in here 

968 with suppress(OSError): 

969 os.remove(self.refpath(ref)) 

970 

971 if target is not None: 

972 packed_refs[ref] = target 

973 else: 

974 packed_refs.pop(ref, None) 

975 

976 write_packed_refs(f, packed_refs, self._peeled_refs) 

977 

978 self._packed_refs = packed_refs 

979 

980 def get_peeled(self, name: bytes) -> Optional[bytes]: 

981 """Return the cached peeled value of a ref, if available. 

982 

983 Args: 

984 name: Name of the ref to peel 

985 Returns: The peeled value of the ref. If the ref is known not point to 

986 a tag, this will be the SHA the ref refers to. If the ref may point 

987 to a tag, but no cached information is available, None is returned. 

988 """ 

989 self.get_packed_refs() 

990 if ( 

991 self._peeled_refs is None 

992 or self._packed_refs is None 

993 or name not in self._packed_refs 

994 ): 

995 # No cache: no peeled refs were read, or this ref is loose 

996 return None 

997 if name in self._peeled_refs: 

998 return self._peeled_refs[name] 

999 else: 

1000 # Known not peelable 

1001 return self[name] 

1002 

1003 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

1004 """Read a reference file and return its contents. 

1005 

1006 If the reference file a symbolic reference, only read the first line of 

1007 the file. Otherwise, only read the first 40 bytes. 

1008 

1009 Args: 

1010 name: the refname to read, relative to refpath 

1011 Returns: The contents of the ref file, or None if the file does not 

1012 exist. 

1013 

1014 Raises: 

1015 IOError: if any other error occurs 

1016 """ 

1017 filename = self.refpath(name) 

1018 try: 

1019 with GitFile(filename, "rb") as f: 

1020 header = f.read(len(SYMREF)) 

1021 if header == SYMREF: 

1022 # Read only the first line 

1023 return header + next(iter(f)).rstrip(b"\r\n") 

1024 else: 

1025 # Read only the first 40 bytes 

1026 return header + f.read(40 - len(SYMREF)) 

1027 except (OSError, UnicodeError): 

1028 # don't assume anything specific about the error; in 

1029 # particular, invalid or forbidden paths can raise weird 

1030 # errors depending on the specific operating system 

1031 return None 

1032 

1033 def _remove_packed_ref(self, name: bytes) -> None: 

1034 if self._packed_refs is None: 

1035 return 

1036 filename = os.path.join(self.path, b"packed-refs") 

1037 # reread cached refs from disk, while holding the lock 

1038 f = GitFile(filename, "wb") 

1039 try: 

1040 self._packed_refs = None 

1041 self.get_packed_refs() 

1042 

1043 if self._packed_refs is None or name not in self._packed_refs: 

1044 f.abort() 

1045 return 

1046 

1047 del self._packed_refs[name] 

1048 if self._peeled_refs is not None: 

1049 with suppress(KeyError): 

1050 del self._peeled_refs[name] 

1051 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1052 f.close() 

1053 except BaseException: 

1054 f.abort() 

1055 raise 

1056 

1057 def set_symbolic_ref( 

1058 self, 

1059 name: bytes, 

1060 other: bytes, 

1061 committer: Optional[bytes] = None, 

1062 timestamp: Optional[int] = None, 

1063 timezone: Optional[int] = None, 

1064 message: Optional[bytes] = None, 

1065 ) -> None: 

1066 """Make a ref point at another ref. 

1067 

1068 Args: 

1069 name: Name of the ref to set 

1070 other: Name of the ref to point at 

1071 committer: Optional committer name 

1072 timestamp: Optional timestamp 

1073 timezone: Optional timezone 

1074 message: Optional message to describe the change 

1075 """ 

1076 self._check_refname(name) 

1077 self._check_refname(other) 

1078 filename = self.refpath(name) 

1079 f = GitFile(filename, "wb") 

1080 try: 

1081 f.write(SYMREF + other + b"\n") 

1082 sha = self.follow(name)[-1] 

1083 self._log( 

1084 name, 

1085 sha, 

1086 sha, 

1087 committer=committer, 

1088 timestamp=timestamp, 

1089 timezone=timezone, 

1090 message=message, 

1091 ) 

1092 except BaseException: 

1093 f.abort() 

1094 raise 

1095 else: 

1096 f.close() 

1097 

1098 def set_if_equals( 

1099 self, 

1100 name: bytes, 

1101 old_ref: Optional[bytes], 

1102 new_ref: bytes, 

1103 committer: Optional[bytes] = None, 

1104 timestamp: Optional[int] = None, 

1105 timezone: Optional[int] = None, 

1106 message: Optional[bytes] = None, 

1107 ) -> bool: 

1108 """Set a refname to new_ref only if it currently equals old_ref. 

1109 

1110 This method follows all symbolic references, and can be used to perform 

1111 an atomic compare-and-swap operation. 

1112 

1113 Args: 

1114 name: The refname to set. 

1115 old_ref: The old sha the refname must refer to, or None to set 

1116 unconditionally. 

1117 new_ref: The new sha the refname will refer to. 

1118 committer: Optional committer name 

1119 timestamp: Optional timestamp 

1120 timezone: Optional timezone 

1121 message: Set message for reflog 

1122 Returns: True if the set was successful, False otherwise. 

1123 """ 

1124 self._check_refname(name) 

1125 try: 

1126 realnames, _ = self.follow(name) 

1127 realname = realnames[-1] 

1128 except (KeyError, IndexError, SymrefLoop): 

1129 realname = name 

1130 filename = self.refpath(realname) 

1131 

1132 # make sure none of the ancestor folders is in packed refs 

1133 probe_ref = os.path.dirname(realname) 

1134 packed_refs = self.get_packed_refs() 

1135 while probe_ref: 

1136 if packed_refs.get(probe_ref, None) is not None: 

1137 raise NotADirectoryError(filename) 

1138 probe_ref = os.path.dirname(probe_ref) 

1139 

1140 ensure_dir_exists(os.path.dirname(filename)) 

1141 with GitFile(filename, "wb") as f: 

1142 if old_ref is not None: 

1143 try: 

1144 # read again while holding the lock to handle race conditions 

1145 orig_ref = self.read_loose_ref(realname) 

1146 if orig_ref is None: 

1147 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1148 if orig_ref != old_ref: 

1149 f.abort() 

1150 return False 

1151 except OSError: 

1152 f.abort() 

1153 raise 

1154 

1155 # Check if ref already has the desired value while holding the lock 

1156 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1157 current_ref = self.read_loose_ref(realname) 

1158 if current_ref is None: 

1159 current_ref = packed_refs.get(realname, None) 

1160 

1161 if current_ref is not None and current_ref == new_ref: 

1162 # Ref already has desired value, abort write to avoid fsync 

1163 f.abort() 

1164 return True 

1165 

1166 try: 

1167 f.write(new_ref + b"\n") 

1168 except OSError: 

1169 f.abort() 

1170 raise 

1171 self._log( 

1172 realname, 

1173 old_ref, 

1174 new_ref, 

1175 committer=committer, 

1176 timestamp=timestamp, 

1177 timezone=timezone, 

1178 message=message, 

1179 ) 

1180 return True 

1181 

1182 def add_if_new( 

1183 self, 

1184 name: bytes, 

1185 ref: bytes, 

1186 committer: Optional[bytes] = None, 

1187 timestamp: Optional[int] = None, 

1188 timezone: Optional[int] = None, 

1189 message: Optional[bytes] = None, 

1190 ) -> bool: 

1191 """Add a new reference only if it does not already exist. 

1192 

1193 This method follows symrefs, and only ensures that the last ref in the 

1194 chain does not exist. 

1195 

1196 Args: 

1197 name: The refname to set. 

1198 ref: The new sha the refname will refer to. 

1199 committer: Optional committer name 

1200 timestamp: Optional timestamp 

1201 timezone: Optional timezone 

1202 message: Optional message for reflog 

1203 Returns: True if the add was successful, False otherwise. 

1204 """ 

1205 try: 

1206 realnames, contents = self.follow(name) 

1207 if contents is not None: 

1208 return False 

1209 realname = realnames[-1] 

1210 except (KeyError, IndexError): 

1211 realname = name 

1212 self._check_refname(realname) 

1213 filename = self.refpath(realname) 

1214 ensure_dir_exists(os.path.dirname(filename)) 

1215 with GitFile(filename, "wb") as f: 

1216 if os.path.exists(filename) or name in self.get_packed_refs(): 

1217 f.abort() 

1218 return False 

1219 try: 

1220 f.write(ref + b"\n") 

1221 except OSError: 

1222 f.abort() 

1223 raise 

1224 else: 

1225 self._log( 

1226 name, 

1227 None, 

1228 ref, 

1229 committer=committer, 

1230 timestamp=timestamp, 

1231 timezone=timezone, 

1232 message=message, 

1233 ) 

1234 return True 

1235 

1236 def remove_if_equals( 

1237 self, 

1238 name: bytes, 

1239 old_ref: Optional[bytes], 

1240 committer: Optional[bytes] = None, 

1241 timestamp: Optional[int] = None, 

1242 timezone: Optional[int] = None, 

1243 message: Optional[bytes] = None, 

1244 ) -> bool: 

1245 """Remove a refname only if it currently equals old_ref. 

1246 

1247 This method does not follow symbolic references. It can be used to 

1248 perform an atomic compare-and-delete operation. 

1249 

1250 Args: 

1251 name: The refname to delete. 

1252 old_ref: The old sha the refname must refer to, or None to 

1253 delete unconditionally. 

1254 committer: Optional committer name 

1255 timestamp: Optional timestamp 

1256 timezone: Optional timezone 

1257 message: Optional message 

1258 Returns: True if the delete was successful, False otherwise. 

1259 """ 

1260 self._check_refname(name) 

1261 filename = self.refpath(name) 

1262 ensure_dir_exists(os.path.dirname(filename)) 

1263 f = GitFile(filename, "wb") 

1264 try: 

1265 if old_ref is not None: 

1266 orig_ref = self.read_loose_ref(name) 

1267 if orig_ref is None: 

1268 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1269 if orig_ref != old_ref: 

1270 return False 

1271 

1272 # remove the reference file itself 

1273 try: 

1274 found = os.path.lexists(filename) 

1275 except OSError: 

1276 # may only be packed, or otherwise unstorable 

1277 found = False 

1278 

1279 if found: 

1280 os.remove(filename) 

1281 

1282 self._remove_packed_ref(name) 

1283 self._log( 

1284 name, 

1285 old_ref, 

1286 None, 

1287 committer=committer, 

1288 timestamp=timestamp, 

1289 timezone=timezone, 

1290 message=message, 

1291 ) 

1292 finally: 

1293 # never write, we just wanted the lock 

1294 f.abort() 

1295 

1296 # outside of the lock, clean-up any parent directory that might now 

1297 # be empty. this ensures that re-creating a reference of the same 

1298 # name of what was previously a directory works as expected 

1299 parent = name 

1300 while True: 

1301 try: 

1302 parent, _ = parent.rsplit(b"/", 1) 

1303 except ValueError: 

1304 break 

1305 

1306 if parent == b"refs": 

1307 break 

1308 parent_filename = self.refpath(parent) 

1309 try: 

1310 os.rmdir(parent_filename) 

1311 except OSError: 

1312 # this can be caused by the parent directory being 

1313 # removed by another process, being not empty, etc. 

1314 # in any case, this is non fatal because we already 

1315 # removed the reference, just ignore it 

1316 break 

1317 

1318 return True 

1319 

1320 def pack_refs(self, all: bool = False) -> None: 

1321 """Pack loose refs into packed-refs file. 

1322 

1323 Args: 

1324 all: If True, pack all refs. If False, only pack tags. 

1325 """ 

1326 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1327 for ref in self.allkeys(): 

1328 if ref == HEADREF: 

1329 # Never pack HEAD 

1330 continue 

1331 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1332 try: 

1333 sha = self[ref] 

1334 if sha: 

1335 refs_to_pack[ref] = sha 

1336 except KeyError: 

1337 # Broken ref, skip it 

1338 pass 

1339 

1340 if refs_to_pack: 

1341 self.add_packed_refs(refs_to_pack) 

1342 

1343 

1344def _split_ref_line(line: bytes) -> tuple[bytes, bytes]: 

1345 """Split a single ref line into a tuple of SHA1 and name.""" 

1346 fields = line.rstrip(b"\n\r").split(b" ") 

1347 if len(fields) != 2: 

1348 raise PackedRefsException(f"invalid ref line {line!r}") 

1349 sha, name = fields 

1350 if not valid_hexsha(sha): 

1351 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1352 if not check_ref_format(name): 

1353 raise PackedRefsException(f"invalid ref name {name!r}") 

1354 return (sha, name) 

1355 

1356 

1357def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]: 

1358 """Read a packed refs file. 

1359 

1360 Args: 

1361 f: file-like object to read from 

1362 Returns: Iterator over tuples with SHA1s and ref names. 

1363 """ 

1364 for line in f: 

1365 if line.startswith(b"#"): 

1366 # Comment 

1367 continue 

1368 if line.startswith(b"^"): 

1369 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1370 yield _split_ref_line(line) 

1371 

1372 

1373def read_packed_refs_with_peeled( 

1374 f: IO[bytes], 

1375) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]: 

1376 """Read a packed refs file including peeled refs. 

1377 

1378 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1379 with ref names, SHA1s, and peeled SHA1s (or None). 

1380 

1381 Args: 

1382 f: file-like object to read from, seek'ed to the second line 

1383 """ 

1384 last = None 

1385 for line in f: 

1386 if line.startswith(b"#"): 

1387 continue 

1388 line = line.rstrip(b"\r\n") 

1389 if line.startswith(b"^"): 

1390 if not last: 

1391 raise PackedRefsException("unexpected peeled ref line") 

1392 if not valid_hexsha(line[1:]): 

1393 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1394 sha, name = _split_ref_line(last) 

1395 last = None 

1396 yield (sha, name, line[1:]) 

1397 else: 

1398 if last: 

1399 sha, name = _split_ref_line(last) 

1400 yield (sha, name, None) 

1401 last = line 

1402 if last: 

1403 sha, name = _split_ref_line(last) 

1404 yield (sha, name, None) 

1405 

1406 

1407def write_packed_refs( 

1408 f: IO[bytes], 

1409 packed_refs: Mapping[bytes, bytes], 

1410 peeled_refs: Optional[Mapping[bytes, bytes]] = None, 

1411) -> None: 

1412 """Write a packed refs file. 

1413 

1414 Args: 

1415 f: empty file-like object to write to 

1416 packed_refs: dict of refname to sha of packed refs to write 

1417 peeled_refs: dict of refname to peeled value of sha 

1418 """ 

1419 if peeled_refs is None: 

1420 peeled_refs = {} 

1421 else: 

1422 f.write(b"# pack-refs with: peeled\n") 

1423 for refname in sorted(packed_refs.keys()): 

1424 f.write(git_line(packed_refs[refname], refname)) 

1425 if refname in peeled_refs: 

1426 f.write(b"^" + peeled_refs[refname] + b"\n") 

1427 

1428 

1429def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]: 

1430 """Read info/refs file. 

1431 

1432 Args: 

1433 f: File-like object to read from 

1434 

1435 Returns: 

1436 Dictionary mapping ref names to SHA1s 

1437 """ 

1438 ret = {} 

1439 for line in f.readlines(): 

1440 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1441 ret[name] = sha 

1442 return ret 

1443 

1444 

1445def write_info_refs( 

1446 refs: Mapping[bytes, bytes], store: ObjectContainer 

1447) -> Iterator[bytes]: 

1448 """Generate info refs.""" 

1449 # TODO: Avoid recursive import :( 

1450 from .object_store import peel_sha 

1451 

1452 for name, sha in sorted(refs.items()): 

1453 # get_refs() includes HEAD as a special case, but we don't want to 

1454 # advertise it 

1455 if name == HEADREF: 

1456 continue 

1457 try: 

1458 o = store[sha] 

1459 except KeyError: 

1460 continue 

1461 _unpeeled, peeled = peel_sha(store, sha) 

1462 yield o.id + b"\t" + name + b"\n" 

1463 if o.id != peeled.id: 

1464 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1465 

1466 

1467def is_local_branch(x: bytes) -> bool: 

1468 """Check if a ref name is a local branch.""" 

1469 return x.startswith(LOCAL_BRANCH_PREFIX) 

1470 

1471 

1472def local_branch_name(name: bytes) -> bytes: 

1473 """Build a full branch ref from a short name. 

1474 

1475 Args: 

1476 name: Short branch name (e.g., b"master") or full ref 

1477 

1478 Returns: 

1479 Full branch ref name (e.g., b"refs/heads/master") 

1480 

1481 Examples: 

1482 >>> local_branch_name(b"master") 

1483 b'refs/heads/master' 

1484 >>> local_branch_name(b"refs/heads/master") 

1485 b'refs/heads/master' 

1486 """ 

1487 if name.startswith(LOCAL_BRANCH_PREFIX): 

1488 return name 

1489 return LOCAL_BRANCH_PREFIX + name 

1490 

1491 

1492def local_tag_name(name: bytes) -> bytes: 

1493 """Build a full tag ref from a short name. 

1494 

1495 Args: 

1496 name: Short tag name (e.g., b"v1.0") or full ref 

1497 

1498 Returns: 

1499 Full tag ref name (e.g., b"refs/tags/v1.0") 

1500 

1501 Examples: 

1502 >>> local_tag_name(b"v1.0") 

1503 b'refs/tags/v1.0' 

1504 >>> local_tag_name(b"refs/tags/v1.0") 

1505 b'refs/tags/v1.0' 

1506 """ 

1507 if name.startswith(LOCAL_TAG_PREFIX): 

1508 return name 

1509 return LOCAL_TAG_PREFIX + name 

1510 

1511 

1512def local_replace_name(name: bytes) -> bytes: 

1513 """Build a full replace ref from a short name. 

1514 

1515 Args: 

1516 name: Short replace name (object SHA) or full ref 

1517 

1518 Returns: 

1519 Full replace ref name (e.g., b"refs/replace/<sha>") 

1520 

1521 Examples: 

1522 >>> local_replace_name(b"abc123") 

1523 b'refs/replace/abc123' 

1524 >>> local_replace_name(b"refs/replace/abc123") 

1525 b'refs/replace/abc123' 

1526 """ 

1527 if name.startswith(LOCAL_REPLACE_PREFIX): 

1528 return name 

1529 return LOCAL_REPLACE_PREFIX + name 

1530 

1531 

1532def extract_branch_name(ref: bytes) -> bytes: 

1533 """Extract branch name from a full branch ref. 

1534 

1535 Args: 

1536 ref: Full branch ref (e.g., b"refs/heads/master") 

1537 

1538 Returns: 

1539 Short branch name (e.g., b"master") 

1540 

1541 Raises: 

1542 ValueError: If ref is not a local branch 

1543 

1544 Examples: 

1545 >>> extract_branch_name(b"refs/heads/master") 

1546 b'master' 

1547 >>> extract_branch_name(b"refs/heads/feature/foo") 

1548 b'feature/foo' 

1549 """ 

1550 if not ref.startswith(LOCAL_BRANCH_PREFIX): 

1551 raise ValueError(f"Not a local branch ref: {ref!r}") 

1552 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1553 

1554 

1555def extract_tag_name(ref: bytes) -> bytes: 

1556 """Extract tag name from a full tag ref. 

1557 

1558 Args: 

1559 ref: Full tag ref (e.g., b"refs/tags/v1.0") 

1560 

1561 Returns: 

1562 Short tag name (e.g., b"v1.0") 

1563 

1564 Raises: 

1565 ValueError: If ref is not a local tag 

1566 

1567 Examples: 

1568 >>> extract_tag_name(b"refs/tags/v1.0") 

1569 b'v1.0' 

1570 """ 

1571 if not ref.startswith(LOCAL_TAG_PREFIX): 

1572 raise ValueError(f"Not a local tag ref: {ref!r}") 

1573 return ref[len(LOCAL_TAG_PREFIX) :] 

1574 

1575 

1576def shorten_ref_name(ref: bytes) -> bytes: 

1577 """Convert a full ref name to its short form. 

1578 

1579 Args: 

1580 ref: Full ref name (e.g., b"refs/heads/master") 

1581 

1582 Returns: 

1583 Short ref name (e.g., b"master") 

1584 

1585 Examples: 

1586 >>> shorten_ref_name(b"refs/heads/master") 

1587 b'master' 

1588 >>> shorten_ref_name(b"refs/remotes/origin/main") 

1589 b'origin/main' 

1590 >>> shorten_ref_name(b"refs/tags/v1.0") 

1591 b'v1.0' 

1592 >>> shorten_ref_name(b"HEAD") 

1593 b'HEAD' 

1594 """ 

1595 if ref.startswith(LOCAL_BRANCH_PREFIX): 

1596 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1597 elif ref.startswith(LOCAL_REMOTE_PREFIX): 

1598 return ref[len(LOCAL_REMOTE_PREFIX) :] 

1599 elif ref.startswith(LOCAL_TAG_PREFIX): 

1600 return ref[len(LOCAL_TAG_PREFIX) :] 

1601 return ref 

1602 

1603 

1604T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]]) 

1605 

1606 

1607def strip_peeled_refs(refs: T) -> T: 

1608 """Remove all peeled refs.""" 

1609 return { 

1610 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1611 } 

1612 

1613 

1614def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]: 

1615 """Split peeled refs from regular refs.""" 

1616 peeled: dict[bytes, bytes] = {} 

1617 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)} 

1618 

1619 for ref, sha in refs.items(): 

1620 if ref.endswith(PEELED_TAG_SUFFIX): 

1621 # Only add to peeled dict if sha is not None 

1622 if sha is not None: 

1623 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1624 

1625 return regular, peeled 

1626 

1627 

1628def _set_origin_head( 

1629 refs: RefsContainer, origin: bytes, origin_head: Optional[bytes] 

1630) -> None: 

1631 # set refs/remotes/origin/HEAD 

1632 origin_base = b"refs/remotes/" + origin + b"/" 

1633 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1634 origin_ref = origin_base + HEADREF 

1635 target_ref = origin_base + extract_branch_name(origin_head) 

1636 if target_ref in refs: 

1637 refs.set_symbolic_ref(origin_ref, target_ref) 

1638 

1639 

1640def _set_default_branch( 

1641 refs: RefsContainer, 

1642 origin: bytes, 

1643 origin_head: Optional[bytes], 

1644 branch: Optional[bytes], 

1645 ref_message: Optional[bytes], 

1646) -> bytes: 

1647 """Set the default branch.""" 

1648 origin_base = b"refs/remotes/" + origin + b"/" 

1649 if branch: 

1650 origin_ref = origin_base + branch 

1651 if origin_ref in refs: 

1652 local_ref = local_branch_name(branch) 

1653 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1654 head_ref = local_ref 

1655 elif local_tag_name(branch) in refs: 

1656 head_ref = local_tag_name(branch) 

1657 else: 

1658 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1659 elif origin_head: 

1660 head_ref = origin_head 

1661 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1662 origin_ref = origin_base + extract_branch_name(origin_head) 

1663 else: 

1664 origin_ref = origin_head 

1665 try: 

1666 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1667 except KeyError: 

1668 pass 

1669 else: 

1670 raise ValueError("neither origin_head nor branch are provided") 

1671 return head_ref 

1672 

1673 

1674def _set_head( 

1675 refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes] 

1676) -> Optional[bytes]: 

1677 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1678 # detach HEAD at specified tag 

1679 head = refs[head_ref] 

1680 if isinstance(head, Tag): 

1681 _cls, obj = head.object 

1682 head = obj.get_object(obj).id 

1683 del refs[HEADREF] 

1684 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1685 else: 

1686 # set HEAD to specific branch 

1687 try: 

1688 head = refs[head_ref] 

1689 refs.set_symbolic_ref(HEADREF, head_ref) 

1690 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1691 except KeyError: 

1692 head = None 

1693 return head 

1694 

1695 

1696def _import_remote_refs( 

1697 refs_container: RefsContainer, 

1698 remote_name: str, 

1699 refs: dict[bytes, Optional[bytes]], 

1700 message: Optional[bytes] = None, 

1701 prune: bool = False, 

1702 prune_tags: bool = False, 

1703) -> None: 

1704 stripped_refs = strip_peeled_refs(refs) 

1705 branches = { 

1706 extract_branch_name(n): v 

1707 for (n, v) in stripped_refs.items() 

1708 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None 

1709 } 

1710 refs_container.import_refs( 

1711 b"refs/remotes/" + remote_name.encode(), 

1712 branches, 

1713 message=message, 

1714 prune=prune, 

1715 ) 

1716 tags = { 

1717 extract_tag_name(n): v 

1718 for (n, v) in stripped_refs.items() 

1719 if n.startswith(LOCAL_TAG_PREFIX) 

1720 and not n.endswith(PEELED_TAG_SUFFIX) 

1721 and v is not None 

1722 } 

1723 refs_container.import_refs( 

1724 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1725 ) 

1726 

1727 

1728def serialize_refs( 

1729 store: ObjectContainer, refs: Mapping[bytes, bytes] 

1730) -> dict[bytes, bytes]: 

1731 """Serialize refs with peeled refs. 

1732 

1733 Args: 

1734 store: Object store to peel refs from 

1735 refs: Dictionary of ref names to SHAs 

1736 

1737 Returns: 

1738 Dictionary with refs and peeled refs (marked with ^{}) 

1739 """ 

1740 # TODO: Avoid recursive import :( 

1741 from .object_store import peel_sha 

1742 

1743 ret = {} 

1744 for ref, sha in refs.items(): 

1745 try: 

1746 unpeeled, peeled = peel_sha(store, sha) 

1747 except KeyError: 

1748 warnings.warn( 

1749 "ref {} points at non-present sha {}".format( 

1750 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1751 ), 

1752 UserWarning, 

1753 ) 

1754 continue 

1755 else: 

1756 if isinstance(unpeeled, Tag): 

1757 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1758 ret[ref] = unpeeled.id 

1759 return ret 

1760 

1761 

1762class locked_ref: 

1763 """Lock a ref while making modifications. 

1764 

1765 Works as a context manager. 

1766 """ 

1767 

1768 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1769 """Initialize a locked ref. 

1770 

1771 Args: 

1772 refs_container: The DiskRefsContainer to lock the ref in 

1773 refname: The ref name to lock 

1774 """ 

1775 self._refs_container = refs_container 

1776 self._refname = refname 

1777 self._file: Optional[_GitFile] = None 

1778 self._realname: Optional[Ref] = None 

1779 self._deleted = False 

1780 

1781 def __enter__(self) -> "locked_ref": 

1782 """Enter the context manager and acquire the lock. 

1783 

1784 Returns: 

1785 This locked_ref instance 

1786 

1787 Raises: 

1788 OSError: If the lock cannot be acquired 

1789 """ 

1790 self._refs_container._check_refname(self._refname) 

1791 try: 

1792 realnames, _ = self._refs_container.follow(self._refname) 

1793 self._realname = realnames[-1] 

1794 except (KeyError, IndexError, SymrefLoop): 

1795 self._realname = self._refname 

1796 

1797 filename = self._refs_container.refpath(self._realname) 

1798 ensure_dir_exists(os.path.dirname(filename)) 

1799 f = GitFile(filename, "wb") 

1800 self._file = f 

1801 return self 

1802 

1803 def __exit__( 

1804 self, 

1805 exc_type: Optional[type], 

1806 exc_value: Optional[BaseException], 

1807 traceback: Optional[types.TracebackType], 

1808 ) -> None: 

1809 """Exit the context manager and release the lock. 

1810 

1811 Args: 

1812 exc_type: Type of exception if one occurred 

1813 exc_value: Exception instance if one occurred 

1814 traceback: Traceback if an exception occurred 

1815 """ 

1816 if self._file: 

1817 if exc_type is not None or self._deleted: 

1818 self._file.abort() 

1819 else: 

1820 self._file.close() 

1821 

1822 def get(self) -> Optional[bytes]: 

1823 """Get the current value of the ref.""" 

1824 if not self._file: 

1825 raise RuntimeError("locked_ref not in context") 

1826 

1827 assert self._realname is not None 

1828 current_ref = self._refs_container.read_loose_ref(self._realname) 

1829 if current_ref is None: 

1830 current_ref = self._refs_container.get_packed_refs().get( 

1831 self._realname, None 

1832 ) 

1833 return current_ref 

1834 

1835 def ensure_equals(self, expected_value: Optional[bytes]) -> bool: 

1836 """Ensure the ref currently equals the expected value. 

1837 

1838 Args: 

1839 expected_value: The expected current value of the ref 

1840 Returns: 

1841 True if the ref equals the expected value, False otherwise 

1842 """ 

1843 current_value = self.get() 

1844 return current_value == expected_value 

1845 

1846 def set(self, new_ref: bytes) -> None: 

1847 """Set the ref to a new value. 

1848 

1849 Args: 

1850 new_ref: The new SHA1 or symbolic ref value 

1851 """ 

1852 if not self._file: 

1853 raise RuntimeError("locked_ref not in context") 

1854 

1855 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1856 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1857 

1858 self._file.seek(0) 

1859 self._file.truncate() 

1860 self._file.write(new_ref + b"\n") 

1861 self._deleted = False 

1862 

1863 def set_symbolic_ref(self, target: Ref) -> None: 

1864 """Make this ref point at another ref. 

1865 

1866 Args: 

1867 target: Name of the ref to point at 

1868 """ 

1869 if not self._file: 

1870 raise RuntimeError("locked_ref not in context") 

1871 

1872 self._refs_container._check_refname(target) 

1873 self._file.seek(0) 

1874 self._file.truncate() 

1875 self._file.write(SYMREF + target + b"\n") 

1876 self._deleted = False 

1877 

1878 def delete(self) -> None: 

1879 """Delete the ref file while holding the lock.""" 

1880 if not self._file: 

1881 raise RuntimeError("locked_ref not in context") 

1882 

1883 # Delete the actual ref file while holding the lock 

1884 if self._realname: 

1885 filename = self._refs_container.refpath(self._realname) 

1886 try: 

1887 if os.path.lexists(filename): 

1888 os.remove(filename) 

1889 except FileNotFoundError: 

1890 pass 

1891 self._refs_container._remove_packed_ref(self._realname) 

1892 

1893 self._deleted = True 

1894 

1895 

1896class NamespacedRefsContainer(RefsContainer): 

1897 """Wrapper that adds namespace prefix to all ref operations. 

1898 

1899 This implements Git's GIT_NAMESPACE feature, which stores refs under 

1900 refs/namespaces/<namespace>/ and filters operations to only show refs 

1901 within that namespace. 

1902 

1903 Example: 

1904 With namespace "foo", a ref "refs/heads/master" is stored as 

1905 "refs/namespaces/foo/refs/heads/master" in the underlying container. 

1906 """ 

1907 

1908 def __init__(self, refs: RefsContainer, namespace: bytes) -> None: 

1909 """Initialize NamespacedRefsContainer. 

1910 

1911 Args: 

1912 refs: The underlying refs container to wrap 

1913 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar") 

1914 """ 

1915 super().__init__(logger=refs._logger) 

1916 self._refs = refs 

1917 # Build namespace prefix: refs/namespaces/<namespace>/ 

1918 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/ 

1919 namespace_parts = namespace.split(b"/") 

1920 self._namespace_prefix = b"" 

1921 for part in namespace_parts: 

1922 self._namespace_prefix += b"refs/namespaces/" + part + b"/" 

1923 

1924 def _apply_namespace(self, name: bytes) -> bytes: 

1925 """Apply namespace prefix to a ref name.""" 

1926 # HEAD and other special refs are not namespaced 

1927 if name == HEADREF or not name.startswith(b"refs/"): 

1928 return name 

1929 return self._namespace_prefix + name 

1930 

1931 def _strip_namespace(self, name: bytes) -> Optional[bytes]: 

1932 """Remove namespace prefix from a ref name. 

1933 

1934 Returns None if the ref is not in our namespace. 

1935 """ 

1936 # HEAD and other special refs are not namespaced 

1937 if name == HEADREF or not name.startswith(b"refs/"): 

1938 return name 

1939 if name.startswith(self._namespace_prefix): 

1940 return name[len(self._namespace_prefix) :] 

1941 return None 

1942 

1943 def allkeys(self) -> set[bytes]: 

1944 """Return all reference keys in this namespace.""" 

1945 keys = set() 

1946 for key in self._refs.allkeys(): 

1947 stripped = self._strip_namespace(key) 

1948 if stripped is not None: 

1949 keys.add(stripped) 

1950 return keys 

1951 

1952 def read_loose_ref(self, name: bytes) -> Optional[bytes]: 

1953 """Read a loose reference.""" 

1954 return self._refs.read_loose_ref(self._apply_namespace(name)) 

1955 

1956 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

1957 """Get packed refs within this namespace.""" 

1958 packed = {} 

1959 for name, value in self._refs.get_packed_refs().items(): 

1960 stripped = self._strip_namespace(name) 

1961 if stripped is not None: 

1962 packed[stripped] = value 

1963 return packed 

1964 

1965 def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None: 

1966 """Add packed refs with namespace prefix.""" 

1967 namespaced_refs = { 

1968 self._apply_namespace(name): value for name, value in new_refs.items() 

1969 } 

1970 self._refs.add_packed_refs(namespaced_refs) 

1971 

1972 def get_peeled(self, name: bytes) -> Optional[ObjectID]: 

1973 """Return the cached peeled value of a ref.""" 

1974 return self._refs.get_peeled(self._apply_namespace(name)) 

1975 

1976 def set_symbolic_ref( 

1977 self, 

1978 name: bytes, 

1979 other: bytes, 

1980 committer: Optional[bytes] = None, 

1981 timestamp: Optional[int] = None, 

1982 timezone: Optional[int] = None, 

1983 message: Optional[bytes] = None, 

1984 ) -> None: 

1985 """Make a ref point at another ref.""" 

1986 self._refs.set_symbolic_ref( 

1987 self._apply_namespace(name), 

1988 self._apply_namespace(other), 

1989 committer=committer, 

1990 timestamp=timestamp, 

1991 timezone=timezone, 

1992 message=message, 

1993 ) 

1994 

1995 def set_if_equals( 

1996 self, 

1997 name: bytes, 

1998 old_ref: Optional[bytes], 

1999 new_ref: bytes, 

2000 committer: Optional[bytes] = None, 

2001 timestamp: Optional[int] = None, 

2002 timezone: Optional[int] = None, 

2003 message: Optional[bytes] = None, 

2004 ) -> bool: 

2005 """Set a refname to new_ref only if it currently equals old_ref.""" 

2006 return self._refs.set_if_equals( 

2007 self._apply_namespace(name), 

2008 old_ref, 

2009 new_ref, 

2010 committer=committer, 

2011 timestamp=timestamp, 

2012 timezone=timezone, 

2013 message=message, 

2014 ) 

2015 

2016 def add_if_new( 

2017 self, 

2018 name: bytes, 

2019 ref: bytes, 

2020 committer: Optional[bytes] = None, 

2021 timestamp: Optional[int] = None, 

2022 timezone: Optional[int] = None, 

2023 message: Optional[bytes] = None, 

2024 ) -> bool: 

2025 """Add a new reference only if it does not already exist.""" 

2026 return self._refs.add_if_new( 

2027 self._apply_namespace(name), 

2028 ref, 

2029 committer=committer, 

2030 timestamp=timestamp, 

2031 timezone=timezone, 

2032 message=message, 

2033 ) 

2034 

2035 def remove_if_equals( 

2036 self, 

2037 name: bytes, 

2038 old_ref: Optional[bytes], 

2039 committer: Optional[bytes] = None, 

2040 timestamp: Optional[int] = None, 

2041 timezone: Optional[int] = None, 

2042 message: Optional[bytes] = None, 

2043 ) -> bool: 

2044 """Remove a refname only if it currently equals old_ref.""" 

2045 return self._refs.remove_if_equals( 

2046 self._apply_namespace(name), 

2047 old_ref, 

2048 committer=committer, 

2049 timestamp=timestamp, 

2050 timezone=timezone, 

2051 message=message, 

2052 ) 

2053 

2054 def pack_refs(self, all: bool = False) -> None: 

2055 """Pack loose refs into packed-refs file. 

2056 

2057 Note: This packs all refs in the underlying container, not just 

2058 those in the namespace. 

2059 """ 

2060 self._refs.pack_refs(all=all) 

2061 

2062 

2063def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

2064 """Filter refs to only include those with a given prefix. 

2065 

2066 Args: 

2067 refs: A dictionary of refs. 

2068 prefixes: The prefixes to filter by. 

2069 """ 

2070 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

2071 return filtered 

2072 

2073 

2074def is_per_worktree_ref(ref: bytes) -> bool: 

2075 """Returns whether a reference is stored per worktree or not. 

2076 

2077 Per-worktree references are: 

2078 - all pseudorefs, e.g. HEAD 

2079 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

2080 

2081 All refs starting with "refs/" are shared, except for the ones listed above. 

2082 

2083 See https://git-scm.com/docs/git-worktree#_refs. 

2084 """ 

2085 return not ref.startswith(b"refs/") or ref.startswith( 

2086 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

2087 )