Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

756 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25__all__ = [ 

26 "HEADREF", 

27 "LOCAL_BRANCH_PREFIX", 

28 "LOCAL_NOTES_PREFIX", 

29 "LOCAL_REMOTE_PREFIX", 

30 "LOCAL_REPLACE_PREFIX", 

31 "LOCAL_TAG_PREFIX", 

32 "SYMREF", 

33 "DictRefsContainer", 

34 "DiskRefsContainer", 

35 "NamespacedRefsContainer", 

36 "Ref", 

37 "RefsContainer", 

38 "SymrefLoop", 

39 "check_ref_format", 

40 "extract_branch_name", 

41 "extract_tag_name", 

42 "filter_ref_prefix", 

43 "is_local_branch", 

44 "is_per_worktree_ref", 

45 "local_branch_name", 

46 "local_replace_name", 

47 "local_tag_name", 

48 "parse_remote_ref", 

49 "parse_symref_value", 

50 "read_info_refs", 

51 "read_packed_refs", 

52 "read_packed_refs_with_peeled", 

53 "set_ref_from_raw", 

54 "shorten_ref_name", 

55 "write_packed_refs", 

56] 

57 

58import os 

59import types 

60from collections.abc import Callable, Iterable, Iterator, Mapping 

61from contextlib import suppress 

62from typing import ( 

63 IO, 

64 TYPE_CHECKING, 

65 Any, 

66 BinaryIO, 

67 NewType, 

68 TypeVar, 

69) 

70 

71if TYPE_CHECKING: 

72 from .file import _GitFile 

73 

74from .errors import PackedRefsException, RefFormatError 

75from .file import GitFile, ensure_dir_exists 

76from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

77 

78Ref = NewType("Ref", bytes) 

79 

80T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None]) 

81 

82HEADREF = Ref(b"HEAD") 

83SYMREF = b"ref: " 

84LOCAL_BRANCH_PREFIX = b"refs/heads/" 

85LOCAL_TAG_PREFIX = b"refs/tags/" 

86LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

87LOCAL_NOTES_PREFIX = b"refs/notes/" 

88LOCAL_REPLACE_PREFIX = b"refs/replace/" 

89BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[") 

90 

91 

92class SymrefLoop(Exception): 

93 """There is a loop between one or more symrefs.""" 

94 

95 def __init__(self, ref: bytes, depth: int) -> None: 

96 """Initialize SymrefLoop exception.""" 

97 self.ref = ref 

98 self.depth = depth 

99 

100 

101def parse_symref_value(contents: bytes) -> bytes: 

102 """Parse a symref value. 

103 

104 Args: 

105 contents: Contents to parse 

106 Returns: Destination 

107 """ 

108 if contents.startswith(SYMREF): 

109 return contents[len(SYMREF) :].rstrip(b"\r\n") 

110 raise ValueError(contents) 

111 

112 

113def check_ref_format(refname: Ref) -> bool: 

114 """Check if a refname is correctly formatted. 

115 

116 Implements all the same rules as git-check-ref-format[1]. 

117 

118 [1] 

119 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

120 

121 Args: 

122 refname: The refname to check 

123 Returns: True if refname is valid, False otherwise 

124 """ 

125 # These could be combined into one big expression, but are listed 

126 # separately to parallel [1]. 

127 if b"/." in refname or refname.startswith(b"."): # type: ignore[comparison-overlap] 

128 return False 

129 if b"/" not in refname: # type: ignore[comparison-overlap] 

130 return False 

131 if b".." in refname: # type: ignore[comparison-overlap] 

132 return False 

133 for i, c in enumerate(refname): 

134 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

135 return False 

136 if refname[-1] in b"/.": 

137 return False 

138 if refname.endswith(b".lock"): 

139 return False 

140 if b"@{" in refname: # type: ignore[comparison-overlap] 

141 return False 

142 if b"\\" in refname: # type: ignore[comparison-overlap] 

143 return False 

144 return True 

145 

146 

147def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

148 """Parse a remote ref into remote name and branch name. 

149 

150 Args: 

151 ref: Remote ref like b"refs/remotes/origin/main" 

152 

153 Returns: 

154 Tuple of (remote_name, branch_name) 

155 

156 Raises: 

157 ValueError: If ref is not a valid remote ref 

158 """ 

159 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

160 raise ValueError(f"Not a remote ref: {ref!r}") 

161 

162 # Remove the prefix 

163 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

164 

165 # Split into remote name and branch name 

166 parts = remainder.split(b"/", 1) 

167 if len(parts) != 2: 

168 raise ValueError(f"Invalid remote ref format: {ref!r}") 

169 

170 remote_name, branch_name = parts 

171 return (remote_name, branch_name) 

172 

173 

174def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None: 

175 """Set a reference from a raw ref value. 

176 

177 This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs. 

178 

179 Args: 

180 refs: The RefsContainer to set the ref in 

181 name: The ref name to set 

182 raw_ref: The raw ref value (either a symbolic ref or an ObjectID) 

183 """ 

184 if raw_ref.startswith(SYMREF): 

185 # It's a symbolic ref 

186 target = Ref(raw_ref[len(SYMREF) :]) 

187 refs.set_symbolic_ref(name, target) 

188 else: 

189 # It's a direct ObjectID 

190 refs[name] = ObjectID(raw_ref) 

191 

192 

193class RefsContainer: 

194 """A container for refs.""" 

195 

196 def __init__( 

197 self, 

198 logger: Callable[ 

199 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

200 ] 

201 | None = None, 

202 ) -> None: 

203 """Initialize RefsContainer with optional logger function.""" 

204 self._logger = logger 

205 

206 def _log( 

207 self, 

208 ref: bytes, 

209 old_sha: bytes | None, 

210 new_sha: bytes | None, 

211 committer: bytes | None = None, 

212 timestamp: int | None = None, 

213 timezone: int | None = None, 

214 message: bytes | None = None, 

215 ) -> None: 

216 if self._logger is None: 

217 return 

218 if message is None: 

219 return 

220 # Use ZERO_SHA for None values, matching git behavior 

221 if old_sha is None: 

222 old_sha = ZERO_SHA 

223 if new_sha is None: 

224 new_sha = ZERO_SHA 

225 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

226 

227 def set_symbolic_ref( 

228 self, 

229 name: Ref, 

230 other: Ref, 

231 committer: bytes | None = None, 

232 timestamp: int | None = None, 

233 timezone: int | None = None, 

234 message: bytes | None = None, 

235 ) -> None: 

236 """Make a ref point at another ref. 

237 

238 Args: 

239 name: Name of the ref to set 

240 other: Name of the ref to point at 

241 committer: Optional committer name/email 

242 timestamp: Optional timestamp 

243 timezone: Optional timezone 

244 message: Optional message 

245 """ 

246 raise NotImplementedError(self.set_symbolic_ref) 

247 

248 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

249 """Get contents of the packed-refs file. 

250 

251 Returns: Dictionary mapping ref names to SHA1s 

252 

253 Note: Will return an empty dictionary when no packed-refs file is 

254 present. 

255 """ 

256 raise NotImplementedError(self.get_packed_refs) 

257 

258 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

259 """Add the given refs as packed refs. 

260 

261 Args: 

262 new_refs: A mapping of ref names to targets; if a target is None that 

263 means remove the ref 

264 """ 

265 raise NotImplementedError(self.add_packed_refs) 

266 

267 def get_peeled(self, name: Ref) -> ObjectID | None: 

268 """Return the cached peeled value of a ref, if available. 

269 

270 Args: 

271 name: Name of the ref to peel 

272 Returns: The peeled value of the ref. If the ref is known not point to 

273 a tag, this will be the SHA the ref refers to. If the ref may point 

274 to a tag, but no cached information is available, None is returned. 

275 """ 

276 return None 

277 

278 def import_refs( 

279 self, 

280 base: Ref, 

281 other: Mapping[Ref, ObjectID | None], 

282 committer: bytes | None = None, 

283 timestamp: bytes | None = None, 

284 timezone: bytes | None = None, 

285 message: bytes | None = None, 

286 prune: bool = False, 

287 ) -> None: 

288 """Import refs from another repository. 

289 

290 Args: 

291 base: Base ref to import into (e.g., b'refs/remotes/origin') 

292 other: Dictionary of refs to import 

293 committer: Optional committer for reflog 

294 timestamp: Optional timestamp for reflog 

295 timezone: Optional timezone for reflog 

296 message: Optional message for reflog 

297 prune: If True, remove refs not in other 

298 """ 

299 if prune: 

300 to_delete = set(self.subkeys(base)) 

301 else: 

302 to_delete = set() 

303 for name, value in other.items(): 

304 if value is None: 

305 to_delete.add(name) 

306 else: 

307 self.set_if_equals( 

308 Ref(b"/".join((base, name))), None, value, message=message 

309 ) 

310 if to_delete: 

311 try: 

312 to_delete.remove(name) 

313 except KeyError: 

314 pass 

315 for ref in to_delete: 

316 self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message) 

317 

318 def allkeys(self) -> set[Ref]: 

319 """All refs present in this container.""" 

320 raise NotImplementedError(self.allkeys) 

321 

322 def __iter__(self) -> Iterator[Ref]: 

323 """Iterate over all reference keys.""" 

324 return iter(self.allkeys()) 

325 

326 def keys(self, base: Ref | None = None) -> set[Ref]: 

327 """Refs present in this container. 

328 

329 Args: 

330 base: An optional base to return refs under. 

331 Returns: An unsorted set of valid refs in this container, including 

332 packed refs. 

333 """ 

334 if base is not None: 

335 return self.subkeys(base) 

336 else: 

337 return self.allkeys() 

338 

339 def subkeys(self, base: Ref) -> set[Ref]: 

340 """Refs present in this container under a base. 

341 

342 Args: 

343 base: The base to return refs under. 

344 Returns: A set of valid refs in this container under the base; the base 

345 prefix is stripped from the ref names returned. 

346 """ 

347 keys: set[Ref] = set() 

348 base_len = len(base) + 1 

349 for refname in self.allkeys(): 

350 if refname.startswith(base): 

351 keys.add(Ref(refname[base_len:])) 

352 return keys 

353 

354 def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]: 

355 """Return the contents of this container as a dictionary.""" 

356 ret: dict[Ref, ObjectID] = {} 

357 keys = self.keys(base) 

358 base_bytes: bytes 

359 if base is None: 

360 base_bytes = b"" 

361 else: 

362 base_bytes = base.rstrip(b"/") 

363 for key in keys: 

364 try: 

365 ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))] 

366 except (SymrefLoop, KeyError): 

367 continue # Unable to resolve 

368 

369 return ret 

370 

371 def _check_refname(self, name: Ref) -> None: 

372 """Ensure a refname is valid and lives in refs or is HEAD. 

373 

374 HEAD is not a valid refname according to git-check-ref-format, but this 

375 class needs to be able to touch HEAD. Also, check_ref_format expects 

376 refnames without the leading 'refs/', but this class requires that 

377 so it cannot touch anything outside the refs dir (or HEAD). 

378 

379 Args: 

380 name: The name of the reference. 

381 

382 Raises: 

383 KeyError: if a refname is not HEAD or is otherwise not valid. 

384 """ 

385 if name in (HEADREF, Ref(b"refs/stash")): 

386 return 

387 if not name.startswith(b"refs/") or not check_ref_format(Ref(name[5:])): 

388 raise RefFormatError(name) 

389 

390 def read_ref(self, refname: Ref) -> bytes | None: 

391 """Read a reference without following any references. 

392 

393 Args: 

394 refname: The name of the reference 

395 Returns: The contents of the ref file, or None if it does 

396 not exist. 

397 """ 

398 contents = self.read_loose_ref(refname) 

399 if not contents: 

400 contents = self.get_packed_refs().get(refname, None) 

401 return contents 

402 

403 def read_loose_ref(self, name: Ref) -> bytes | None: 

404 """Read a loose reference and return its contents. 

405 

406 Args: 

407 name: the refname to read 

408 Returns: The contents of the ref file, or None if it does 

409 not exist. 

410 """ 

411 raise NotImplementedError(self.read_loose_ref) 

412 

413 def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]: 

414 """Follow a reference name. 

415 

416 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

417 references in the chain 

418 """ 

419 contents: bytes | None = SYMREF + name 

420 depth = 0 

421 refnames: list[Ref] = [] 

422 while contents and contents.startswith(SYMREF): 

423 refname = Ref(contents[len(SYMREF) :]) 

424 refnames.append(refname) 

425 contents = self.read_ref(refname) 

426 if not contents: 

427 break 

428 depth += 1 

429 if depth > 5: 

430 raise SymrefLoop(name, depth) 

431 return refnames, ObjectID(contents) if contents else None 

432 

433 def __contains__(self, refname: Ref) -> bool: 

434 """Check if a reference exists.""" 

435 if self.read_ref(refname): 

436 return True 

437 return False 

438 

439 def __getitem__(self, name: Ref) -> ObjectID: 

440 """Get the SHA1 for a reference name. 

441 

442 This method follows all symbolic references. 

443 """ 

444 _, sha = self.follow(name) 

445 if sha is None: 

446 raise KeyError(name) 

447 return sha 

448 

449 def set_if_equals( 

450 self, 

451 name: Ref, 

452 old_ref: ObjectID | None, 

453 new_ref: ObjectID, 

454 committer: bytes | None = None, 

455 timestamp: int | None = None, 

456 timezone: int | None = None, 

457 message: bytes | None = None, 

458 ) -> bool: 

459 """Set a refname to new_ref only if it currently equals old_ref. 

460 

461 This method follows all symbolic references if applicable for the 

462 subclass, and can be used to perform an atomic compare-and-swap 

463 operation. 

464 

465 Args: 

466 name: The refname to set. 

467 old_ref: The old sha the refname must refer to, or None to set 

468 unconditionally. 

469 new_ref: The new sha the refname will refer to. 

470 committer: Optional committer name/email 

471 timestamp: Optional timestamp 

472 timezone: Optional timezone 

473 message: Message for reflog 

474 Returns: True if the set was successful, False otherwise. 

475 """ 

476 raise NotImplementedError(self.set_if_equals) 

477 

478 def add_if_new( 

479 self, 

480 name: Ref, 

481 ref: ObjectID, 

482 committer: bytes | None = None, 

483 timestamp: int | None = None, 

484 timezone: int | None = None, 

485 message: bytes | None = None, 

486 ) -> bool: 

487 """Add a new reference only if it does not already exist. 

488 

489 Args: 

490 name: Ref name 

491 ref: Ref value 

492 committer: Optional committer name/email 

493 timestamp: Optional timestamp 

494 timezone: Optional timezone 

495 message: Optional message for reflog 

496 """ 

497 raise NotImplementedError(self.add_if_new) 

498 

499 def __setitem__(self, name: Ref, ref: ObjectID) -> None: 

500 """Set a reference name to point to the given SHA1. 

501 

502 This method follows all symbolic references if applicable for the 

503 subclass. 

504 

505 Note: This method unconditionally overwrites the contents of a 

506 reference. To update atomically only if the reference has not 

507 changed, use set_if_equals(). 

508 

509 Args: 

510 name: The refname to set. 

511 ref: The new sha the refname will refer to. 

512 """ 

513 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

514 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

515 self.set_if_equals(name, None, ref) 

516 

517 def remove_if_equals( 

518 self, 

519 name: Ref, 

520 old_ref: ObjectID | None, 

521 committer: bytes | None = None, 

522 timestamp: int | None = None, 

523 timezone: int | None = None, 

524 message: bytes | None = None, 

525 ) -> bool: 

526 """Remove a refname only if it currently equals old_ref. 

527 

528 This method does not follow symbolic references, even if applicable for 

529 the subclass. It can be used to perform an atomic compare-and-delete 

530 operation. 

531 

532 Args: 

533 name: The refname to delete. 

534 old_ref: The old sha the refname must refer to, or None to 

535 delete unconditionally. 

536 committer: Optional committer name/email 

537 timestamp: Optional timestamp 

538 timezone: Optional timezone 

539 message: Message for reflog 

540 Returns: True if the delete was successful, False otherwise. 

541 """ 

542 raise NotImplementedError(self.remove_if_equals) 

543 

544 def __delitem__(self, name: Ref) -> None: 

545 """Remove a refname. 

546 

547 This method does not follow symbolic references, even if applicable for 

548 the subclass. 

549 

550 Note: This method unconditionally deletes the contents of a reference. 

551 To delete atomically only if the reference has not changed, use 

552 remove_if_equals(). 

553 

554 Args: 

555 name: The refname to delete. 

556 """ 

557 self.remove_if_equals(name, None) 

558 

559 def get_symrefs(self) -> dict[Ref, Ref]: 

560 """Get a dict with all symrefs in this container. 

561 

562 Returns: Dictionary mapping source ref to target ref 

563 """ 

564 ret: dict[Ref, Ref] = {} 

565 for src in self.allkeys(): 

566 try: 

567 ref_value = self.read_ref(src) 

568 assert ref_value is not None 

569 dst = parse_symref_value(ref_value) 

570 except ValueError: 

571 pass 

572 else: 

573 ret[src] = Ref(dst) 

574 return ret 

575 

576 def pack_refs(self, all: bool = False) -> None: 

577 """Pack loose refs into packed-refs file. 

578 

579 Args: 

580 all: If True, pack all refs. If False, only pack tags. 

581 """ 

582 raise NotImplementedError(self.pack_refs) 

583 

584 

585class DictRefsContainer(RefsContainer): 

586 """RefsContainer backed by a simple dict. 

587 

588 This container does not support symbolic or packed references and is not 

589 threadsafe. 

590 """ 

591 

592 def __init__( 

593 self, 

594 refs: dict[Ref, bytes], 

595 logger: Callable[ 

596 [ 

597 bytes, 

598 bytes | None, 

599 bytes | None, 

600 bytes | None, 

601 int | None, 

602 int | None, 

603 bytes | None, 

604 ], 

605 None, 

606 ] 

607 | None = None, 

608 ) -> None: 

609 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

610 super().__init__(logger=logger) 

611 self._refs = refs 

612 self._peeled: dict[Ref, ObjectID] = {} 

613 self._watchers: set[Any] = set() 

614 

615 def allkeys(self) -> set[Ref]: 

616 """Return all reference keys.""" 

617 return set(self._refs.keys()) 

618 

619 def read_loose_ref(self, name: Ref) -> bytes | None: 

620 """Read a loose reference.""" 

621 return self._refs.get(name, None) 

622 

623 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

624 """Get packed references.""" 

625 return {} 

626 

627 def _notify(self, ref: bytes, newsha: bytes | None) -> None: 

628 for watcher in self._watchers: 

629 watcher._notify((ref, newsha)) 

630 

631 def set_symbolic_ref( 

632 self, 

633 name: Ref, 

634 other: Ref, 

635 committer: bytes | None = None, 

636 timestamp: int | None = None, 

637 timezone: int | None = None, 

638 message: bytes | None = None, 

639 ) -> None: 

640 """Make a ref point at another ref. 

641 

642 Args: 

643 name: Name of the ref to set 

644 other: Name of the ref to point at 

645 committer: Optional committer name for reflog 

646 timestamp: Optional timestamp for reflog 

647 timezone: Optional timezone for reflog 

648 message: Optional message for reflog 

649 """ 

650 old = self.follow(name)[-1] 

651 new = SYMREF + other 

652 self._refs[name] = new 

653 self._notify(name, new) 

654 self._log( 

655 name, 

656 old, 

657 new, 

658 committer=committer, 

659 timestamp=timestamp, 

660 timezone=timezone, 

661 message=message, 

662 ) 

663 

664 def set_if_equals( 

665 self, 

666 name: Ref, 

667 old_ref: ObjectID | None, 

668 new_ref: ObjectID, 

669 committer: bytes | None = None, 

670 timestamp: int | None = None, 

671 timezone: int | None = None, 

672 message: bytes | None = None, 

673 ) -> bool: 

674 """Set a refname to new_ref only if it currently equals old_ref. 

675 

676 This method follows all symbolic references, and can be used to perform 

677 an atomic compare-and-swap operation. 

678 

679 Args: 

680 name: The refname to set. 

681 old_ref: The old sha the refname must refer to, or None to set 

682 unconditionally. 

683 new_ref: The new sha the refname will refer to. 

684 committer: Optional committer name for reflog 

685 timestamp: Optional timestamp for reflog 

686 timezone: Optional timezone for reflog 

687 message: Optional message for reflog 

688 

689 Returns: 

690 True if the set was successful, False otherwise. 

691 """ 

692 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

693 return False 

694 # Only update the specific ref requested, not the whole chain 

695 self._check_refname(name) 

696 old = self._refs.get(name) 

697 self._refs[name] = new_ref 

698 self._notify(name, new_ref) 

699 self._log( 

700 name, 

701 old, 

702 new_ref, 

703 committer=committer, 

704 timestamp=timestamp, 

705 timezone=timezone, 

706 message=message, 

707 ) 

708 return True 

709 

710 def add_if_new( 

711 self, 

712 name: Ref, 

713 ref: ObjectID, 

714 committer: bytes | None = None, 

715 timestamp: int | None = None, 

716 timezone: int | None = None, 

717 message: bytes | None = None, 

718 ) -> bool: 

719 """Add a new reference only if it does not already exist. 

720 

721 Args: 

722 name: Ref name 

723 ref: Ref value 

724 committer: Optional committer name for reflog 

725 timestamp: Optional timestamp for reflog 

726 timezone: Optional timezone for reflog 

727 message: Optional message for reflog 

728 

729 Returns: 

730 True if the add was successful, False otherwise. 

731 """ 

732 if name in self._refs: 

733 return False 

734 self._refs[name] = ref 

735 self._notify(name, ref) 

736 self._log( 

737 name, 

738 None, 

739 ref, 

740 committer=committer, 

741 timestamp=timestamp, 

742 timezone=timezone, 

743 message=message, 

744 ) 

745 return True 

746 

747 def remove_if_equals( 

748 self, 

749 name: Ref, 

750 old_ref: ObjectID | None, 

751 committer: bytes | None = None, 

752 timestamp: int | None = None, 

753 timezone: int | None = None, 

754 message: bytes | None = None, 

755 ) -> bool: 

756 """Remove a refname only if it currently equals old_ref. 

757 

758 This method does not follow symbolic references. It can be used to 

759 perform an atomic compare-and-delete operation. 

760 

761 Args: 

762 name: The refname to delete. 

763 old_ref: The old sha the refname must refer to, or None to 

764 delete unconditionally. 

765 committer: Optional committer name for reflog 

766 timestamp: Optional timestamp for reflog 

767 timezone: Optional timezone for reflog 

768 message: Optional message for reflog 

769 

770 Returns: 

771 True if the delete was successful, False otherwise. 

772 """ 

773 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

774 return False 

775 try: 

776 old = self._refs.pop(name) 

777 except KeyError: 

778 pass 

779 else: 

780 self._notify(name, None) 

781 self._log( 

782 name, 

783 old, 

784 None, 

785 committer=committer, 

786 timestamp=timestamp, 

787 timezone=timezone, 

788 message=message, 

789 ) 

790 return True 

791 

792 def get_peeled(self, name: Ref) -> ObjectID | None: 

793 """Get peeled version of a reference.""" 

794 return self._peeled.get(name) 

795 

796 def _update(self, refs: Mapping[Ref, ObjectID]) -> None: 

797 """Update multiple refs; intended only for testing.""" 

798 # TODO(dborowitz): replace this with a public function that uses 

799 # set_if_equal. 

800 for ref, sha in refs.items(): 

801 self.set_if_equals(ref, None, sha) 

802 

803 def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None: 

804 """Update cached peeled refs; intended only for testing.""" 

805 self._peeled.update(peeled) 

806 

807 

808class DiskRefsContainer(RefsContainer): 

809 """Refs container that reads refs from disk.""" 

810 

811 def __init__( 

812 self, 

813 path: str | bytes | os.PathLike[str], 

814 worktree_path: str | bytes | os.PathLike[str] | None = None, 

815 logger: Callable[ 

816 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

817 ] 

818 | None = None, 

819 ) -> None: 

820 """Initialize DiskRefsContainer.""" 

821 super().__init__(logger=logger) 

822 # Convert path-like objects to strings, then to bytes for Git compatibility 

823 self.path = os.fsencode(os.fspath(path)) 

824 if worktree_path is None: 

825 self.worktree_path = self.path 

826 else: 

827 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

828 self._packed_refs: dict[Ref, ObjectID] | None = None 

829 self._peeled_refs: dict[Ref, ObjectID] | None = None 

830 

831 def __repr__(self) -> str: 

832 """Return string representation of DiskRefsContainer.""" 

833 return f"{self.__class__.__name__}({self.path!r})" 

834 

835 def _iter_dir( 

836 self, 

837 path: bytes, 

838 base: bytes, 

839 dir_filter: Callable[[bytes], bool] | None = None, 

840 ) -> Iterator[Ref]: 

841 refspath = os.path.join(path, base.rstrip(b"/")) 

842 prefix_len = len(os.path.join(path, b"")) 

843 

844 for root, dirs, files in os.walk(refspath): 

845 directory = root[prefix_len:] 

846 if os.path.sep != "/": 

847 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

848 if dir_filter is not None: 

849 dirs[:] = [ 

850 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

851 ] 

852 

853 for filename in files: 

854 refname = b"/".join([directory, filename]) 

855 if check_ref_format(Ref(refname)): 

856 yield Ref(refname) 

857 

858 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]: 

859 base = base.rstrip(b"/") + b"/" 

860 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = [] 

861 if base != b"refs/": 

862 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

863 search_paths.append((path, None)) 

864 elif self.worktree_path == self.path: 

865 # Iterate through all the refs from the main worktree 

866 search_paths.append((self.path, None)) 

867 else: 

868 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

869 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

870 # Iterate through all the per-worktree refs from the worktree's gitdir 

871 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

872 

873 for path, dir_filter in search_paths: 

874 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

875 

876 def subkeys(self, base: Ref) -> set[Ref]: 

877 """Return subkeys under a given base reference path.""" 

878 subkeys: set[Ref] = set() 

879 

880 for key in self._iter_loose_refs(base): 

881 if key.startswith(base): 

882 subkeys.add(Ref(key[len(base) :].strip(b"/"))) 

883 

884 for key in self.get_packed_refs(): 

885 if key.startswith(base): 

886 subkeys.add(Ref(key[len(base) :].strip(b"/"))) 

887 return subkeys 

888 

889 def allkeys(self) -> set[Ref]: 

890 """Return all reference keys.""" 

891 allkeys: set[Ref] = set() 

892 if os.path.exists(self.refpath(HEADREF)): 

893 allkeys.add(Ref(HEADREF)) 

894 

895 allkeys.update(self._iter_loose_refs()) 

896 allkeys.update(self.get_packed_refs()) 

897 return allkeys 

898 

899 def refpath(self, name: bytes) -> bytes: 

900 """Return the disk path of a ref.""" 

901 path = name 

902 if os.path.sep != "/": 

903 path = path.replace(b"/", os.fsencode(os.path.sep)) 

904 

905 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

906 return os.path.join(root_dir, path) 

907 

908 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

909 """Get contents of the packed-refs file. 

910 

911 Returns: Dictionary mapping ref names to SHA1s 

912 

913 Note: Will return an empty dictionary when no packed-refs file is 

914 present. 

915 """ 

916 # TODO: invalidate the cache on repacking 

917 if self._packed_refs is None: 

918 # set both to empty because we want _peeled_refs to be 

919 # None if and only if _packed_refs is also None. 

920 self._packed_refs = {} 

921 self._peeled_refs = {} 

922 path = os.path.join(self.path, b"packed-refs") 

923 try: 

924 f = GitFile(path, "rb") 

925 except FileNotFoundError: 

926 return {} 

927 with f: 

928 first_line = next(iter(f)).rstrip() 

929 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

930 for sha, name, peeled in read_packed_refs_with_peeled(f): 

931 self._packed_refs[name] = sha 

932 if peeled: 

933 self._peeled_refs[name] = peeled 

934 else: 

935 f.seek(0) 

936 for sha, name in read_packed_refs(f): 

937 self._packed_refs[name] = sha 

938 return self._packed_refs 

939 

940 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

941 """Add the given refs as packed refs. 

942 

943 Args: 

944 new_refs: A mapping of ref names to targets; if a target is None that 

945 means remove the ref 

946 """ 

947 if not new_refs: 

948 return 

949 

950 path = os.path.join(self.path, b"packed-refs") 

951 

952 with GitFile(path, "wb") as f: 

953 # reread cached refs from disk, while holding the lock 

954 packed_refs = self.get_packed_refs().copy() 

955 

956 for ref, target in new_refs.items(): 

957 # sanity check 

958 if ref == HEADREF: 

959 raise ValueError("cannot pack HEAD") 

960 

961 # remove any loose refs pointing to this one -- please 

962 # note that this bypasses remove_if_equals as we don't 

963 # want to affect packed refs in here 

964 with suppress(OSError): 

965 os.remove(self.refpath(ref)) 

966 

967 if target is not None: 

968 packed_refs[ref] = target 

969 else: 

970 packed_refs.pop(ref, None) 

971 

972 write_packed_refs(f, packed_refs, self._peeled_refs) 

973 

974 self._packed_refs = packed_refs 

975 

976 def get_peeled(self, name: Ref) -> ObjectID | None: 

977 """Return the cached peeled value of a ref, if available. 

978 

979 Args: 

980 name: Name of the ref to peel 

981 Returns: The peeled value of the ref. If the ref is known not point to 

982 a tag, this will be the SHA the ref refers to. If the ref may point 

983 to a tag, but no cached information is available, None is returned. 

984 """ 

985 self.get_packed_refs() 

986 if ( 

987 self._peeled_refs is None 

988 or self._packed_refs is None 

989 or name not in self._packed_refs 

990 ): 

991 # No cache: no peeled refs were read, or this ref is loose 

992 return None 

993 if name in self._peeled_refs: 

994 return self._peeled_refs[name] 

995 else: 

996 # Known not peelable 

997 return self[name] 

998 

999 def read_loose_ref(self, name: Ref) -> bytes | None: 

1000 """Read a reference file and return its contents. 

1001 

1002 If the reference file a symbolic reference, only read the first line of 

1003 the file. Otherwise, read the hash (40 bytes for SHA1, 64 bytes for SHA256). 

1004 

1005 Args: 

1006 name: the refname to read, relative to refpath 

1007 Returns: The contents of the ref file, or None if the file does not 

1008 exist. 

1009 

1010 Raises: 

1011 IOError: if any other error occurs 

1012 """ 

1013 filename = self.refpath(name) 

1014 try: 

1015 with GitFile(filename, "rb") as f: 

1016 header = f.read(len(SYMREF)) 

1017 if header == SYMREF: 

1018 # Read only the first line 

1019 return header + next(iter(f)).rstrip(b"\r\n") 

1020 else: 

1021 # Read the entire line to get the full hash (handles both SHA1 and SHA256) 

1022 f.seek(0) 

1023 line = f.readline().rstrip(b"\r\n") 

1024 return line 

1025 except (OSError, UnicodeError): 

1026 # don't assume anything specific about the error; in 

1027 # particular, invalid or forbidden paths can raise weird 

1028 # errors depending on the specific operating system 

1029 return None 

1030 

1031 def _remove_packed_ref(self, name: Ref) -> None: 

1032 if self._packed_refs is None: 

1033 return 

1034 filename = os.path.join(self.path, b"packed-refs") 

1035 # reread cached refs from disk, while holding the lock 

1036 f = GitFile(filename, "wb") 

1037 try: 

1038 self._packed_refs = None 

1039 self.get_packed_refs() 

1040 

1041 if self._packed_refs is None or name not in self._packed_refs: 

1042 f.abort() 

1043 return 

1044 

1045 del self._packed_refs[name] 

1046 if self._peeled_refs is not None: 

1047 with suppress(KeyError): 

1048 del self._peeled_refs[name] 

1049 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1050 f.close() 

1051 except BaseException: 

1052 f.abort() 

1053 raise 

1054 

1055 def set_symbolic_ref( 

1056 self, 

1057 name: Ref, 

1058 other: Ref, 

1059 committer: bytes | None = None, 

1060 timestamp: int | None = None, 

1061 timezone: int | None = None, 

1062 message: bytes | None = None, 

1063 ) -> None: 

1064 """Make a ref point at another ref. 

1065 

1066 Args: 

1067 name: Name of the ref to set 

1068 other: Name of the ref to point at 

1069 committer: Optional committer name 

1070 timestamp: Optional timestamp 

1071 timezone: Optional timezone 

1072 message: Optional message to describe the change 

1073 """ 

1074 self._check_refname(name) 

1075 self._check_refname(other) 

1076 filename = self.refpath(name) 

1077 f = GitFile(filename, "wb") 

1078 try: 

1079 f.write(SYMREF + other + b"\n") 

1080 sha = self.follow(name)[-1] 

1081 self._log( 

1082 name, 

1083 sha, 

1084 sha, 

1085 committer=committer, 

1086 timestamp=timestamp, 

1087 timezone=timezone, 

1088 message=message, 

1089 ) 

1090 except BaseException: 

1091 f.abort() 

1092 raise 

1093 else: 

1094 f.close() 

1095 

1096 def set_if_equals( 

1097 self, 

1098 name: Ref, 

1099 old_ref: ObjectID | None, 

1100 new_ref: ObjectID, 

1101 committer: bytes | None = None, 

1102 timestamp: int | None = None, 

1103 timezone: int | None = None, 

1104 message: bytes | None = None, 

1105 ) -> bool: 

1106 """Set a refname to new_ref only if it currently equals old_ref. 

1107 

1108 This method follows all symbolic references, and can be used to perform 

1109 an atomic compare-and-swap operation. 

1110 

1111 Args: 

1112 name: The refname to set. 

1113 old_ref: The old sha the refname must refer to, or None to set 

1114 unconditionally. 

1115 new_ref: The new sha the refname will refer to. 

1116 committer: Optional committer name 

1117 timestamp: Optional timestamp 

1118 timezone: Optional timezone 

1119 message: Set message for reflog 

1120 Returns: True if the set was successful, False otherwise. 

1121 """ 

1122 self._check_refname(name) 

1123 try: 

1124 realnames, _ = self.follow(name) 

1125 realname = realnames[-1] 

1126 except (KeyError, IndexError, SymrefLoop): 

1127 realname = name 

1128 filename = self.refpath(realname) 

1129 

1130 # make sure none of the ancestor folders is in packed refs 

1131 probe_ref = Ref(os.path.dirname(realname)) 

1132 packed_refs = self.get_packed_refs() 

1133 while probe_ref: 

1134 if packed_refs.get(probe_ref, None) is not None: 

1135 raise NotADirectoryError(filename) 

1136 probe_ref = Ref(os.path.dirname(probe_ref)) 

1137 

1138 ensure_dir_exists(os.path.dirname(filename)) 

1139 with GitFile(filename, "wb") as f: 

1140 if old_ref is not None: 

1141 try: 

1142 # read again while holding the lock to handle race conditions 

1143 orig_ref = self.read_loose_ref(realname) 

1144 if orig_ref is None: 

1145 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1146 if orig_ref != old_ref: 

1147 f.abort() 

1148 return False 

1149 except OSError: 

1150 f.abort() 

1151 raise 

1152 

1153 # Check if ref already has the desired value while holding the lock 

1154 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1155 current_ref = self.read_loose_ref(realname) 

1156 if current_ref is None: 

1157 current_ref = packed_refs.get(realname, None) 

1158 

1159 if current_ref is not None and current_ref == new_ref: 

1160 # Ref already has desired value, abort write to avoid fsync 

1161 f.abort() 

1162 return True 

1163 

1164 try: 

1165 f.write(new_ref + b"\n") 

1166 except OSError: 

1167 f.abort() 

1168 raise 

1169 self._log( 

1170 realname, 

1171 old_ref, 

1172 new_ref, 

1173 committer=committer, 

1174 timestamp=timestamp, 

1175 timezone=timezone, 

1176 message=message, 

1177 ) 

1178 return True 

1179 

1180 def add_if_new( 

1181 self, 

1182 name: Ref, 

1183 ref: ObjectID, 

1184 committer: bytes | None = None, 

1185 timestamp: int | None = None, 

1186 timezone: int | None = None, 

1187 message: bytes | None = None, 

1188 ) -> bool: 

1189 """Add a new reference only if it does not already exist. 

1190 

1191 This method follows symrefs, and only ensures that the last ref in the 

1192 chain does not exist. 

1193 

1194 Args: 

1195 name: The refname to set. 

1196 ref: The new sha the refname will refer to. 

1197 committer: Optional committer name 

1198 timestamp: Optional timestamp 

1199 timezone: Optional timezone 

1200 message: Optional message for reflog 

1201 Returns: True if the add was successful, False otherwise. 

1202 """ 

1203 try: 

1204 realnames, contents = self.follow(name) 

1205 if contents is not None: 

1206 return False 

1207 realname = realnames[-1] 

1208 except (KeyError, IndexError): 

1209 realname = name 

1210 self._check_refname(realname) 

1211 filename = self.refpath(realname) 

1212 ensure_dir_exists(os.path.dirname(filename)) 

1213 with GitFile(filename, "wb") as f: 

1214 if os.path.exists(filename) or name in self.get_packed_refs(): 

1215 f.abort() 

1216 return False 

1217 try: 

1218 f.write(ref + b"\n") 

1219 except OSError: 

1220 f.abort() 

1221 raise 

1222 else: 

1223 self._log( 

1224 name, 

1225 None, 

1226 ref, 

1227 committer=committer, 

1228 timestamp=timestamp, 

1229 timezone=timezone, 

1230 message=message, 

1231 ) 

1232 return True 

1233 

1234 def remove_if_equals( 

1235 self, 

1236 name: Ref, 

1237 old_ref: ObjectID | None, 

1238 committer: bytes | None = None, 

1239 timestamp: int | None = None, 

1240 timezone: int | None = None, 

1241 message: bytes | None = None, 

1242 ) -> bool: 

1243 """Remove a refname only if it currently equals old_ref. 

1244 

1245 This method does not follow symbolic references. It can be used to 

1246 perform an atomic compare-and-delete operation. 

1247 

1248 Args: 

1249 name: The refname to delete. 

1250 old_ref: The old sha the refname must refer to, or None to 

1251 delete unconditionally. 

1252 committer: Optional committer name 

1253 timestamp: Optional timestamp 

1254 timezone: Optional timezone 

1255 message: Optional message 

1256 Returns: True if the delete was successful, False otherwise. 

1257 """ 

1258 self._check_refname(name) 

1259 filename = self.refpath(name) 

1260 ensure_dir_exists(os.path.dirname(filename)) 

1261 f = GitFile(filename, "wb") 

1262 try: 

1263 if old_ref is not None: 

1264 orig_ref = self.read_loose_ref(name) 

1265 if orig_ref is None: 

1266 orig_ref = self.get_packed_refs().get(name) 

1267 if orig_ref is None: 

1268 orig_ref = ZERO_SHA 

1269 if orig_ref != old_ref: 

1270 return False 

1271 

1272 # remove the reference file itself 

1273 try: 

1274 found = os.path.lexists(filename) 

1275 except OSError: 

1276 # may only be packed, or otherwise unstorable 

1277 found = False 

1278 

1279 if found: 

1280 os.remove(filename) 

1281 

1282 self._remove_packed_ref(name) 

1283 self._log( 

1284 name, 

1285 old_ref, 

1286 None, 

1287 committer=committer, 

1288 timestamp=timestamp, 

1289 timezone=timezone, 

1290 message=message, 

1291 ) 

1292 finally: 

1293 # never write, we just wanted the lock 

1294 f.abort() 

1295 

1296 # outside of the lock, clean-up any parent directory that might now 

1297 # be empty. this ensures that re-creating a reference of the same 

1298 # name of what was previously a directory works as expected 

1299 parent = name 

1300 while True: 

1301 try: 

1302 parent_bytes, _ = parent.rsplit(b"/", 1) 

1303 parent = Ref(parent_bytes) 

1304 except ValueError: 

1305 break 

1306 

1307 if parent == b"refs": 

1308 break 

1309 parent_filename = self.refpath(parent) 

1310 try: 

1311 os.rmdir(parent_filename) 

1312 except OSError: 

1313 # this can be caused by the parent directory being 

1314 # removed by another process, being not empty, etc. 

1315 # in any case, this is non fatal because we already 

1316 # removed the reference, just ignore it 

1317 break 

1318 

1319 return True 

1320 

1321 def pack_refs(self, all: bool = False) -> None: 

1322 """Pack loose refs into packed-refs file. 

1323 

1324 Args: 

1325 all: If True, pack all refs. If False, only pack tags. 

1326 """ 

1327 refs_to_pack: dict[Ref, ObjectID | None] = {} 

1328 for ref in self.allkeys(): 

1329 if ref == HEADREF: 

1330 # Never pack HEAD 

1331 continue 

1332 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1333 try: 

1334 sha = self[ref] 

1335 if sha: 

1336 refs_to_pack[ref] = sha 

1337 except KeyError: 

1338 # Broken ref, skip it 

1339 pass 

1340 

1341 if refs_to_pack: 

1342 self.add_packed_refs(refs_to_pack) 

1343 

1344 

1345def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]: 

1346 """Split a single ref line into a tuple of SHA1 and name.""" 

1347 fields = line.rstrip(b"\n\r").split(b" ") 

1348 if len(fields) != 2: 

1349 raise PackedRefsException(f"invalid ref line {line!r}") 

1350 sha, name = fields 

1351 if not valid_hexsha(sha): 

1352 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1353 if not check_ref_format(Ref(name)): 

1354 raise PackedRefsException(f"invalid ref name {name!r}") 

1355 return (ObjectID(sha), Ref(name)) 

1356 

1357 

1358def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]: 

1359 """Read a packed refs file. 

1360 

1361 Args: 

1362 f: file-like object to read from 

1363 Returns: Iterator over tuples with SHA1s and ref names. 

1364 """ 

1365 for line in f: 

1366 if line.startswith(b"#"): 

1367 # Comment 

1368 continue 

1369 if line.startswith(b"^"): 

1370 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1371 yield _split_ref_line(line) 

1372 

1373 

1374def read_packed_refs_with_peeled( 

1375 f: IO[bytes], 

1376) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]: 

1377 """Read a packed refs file including peeled refs. 

1378 

1379 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1380 with ref names, SHA1s, and peeled SHA1s (or None). 

1381 

1382 Args: 

1383 f: file-like object to read from, seek'ed to the second line 

1384 """ 

1385 last = None 

1386 for line in f: 

1387 if line.startswith(b"#"): 

1388 continue 

1389 line = line.rstrip(b"\r\n") 

1390 if line.startswith(b"^"): 

1391 if not last: 

1392 raise PackedRefsException("unexpected peeled ref line") 

1393 if not valid_hexsha(line[1:]): 

1394 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1395 sha, name = _split_ref_line(last) 

1396 last = None 

1397 yield (sha, name, ObjectID(line[1:])) 

1398 else: 

1399 if last: 

1400 sha, name = _split_ref_line(last) 

1401 yield (sha, name, None) 

1402 last = line 

1403 if last: 

1404 sha, name = _split_ref_line(last) 

1405 yield (sha, name, None) 

1406 

1407 

1408def write_packed_refs( 

1409 f: IO[bytes], 

1410 packed_refs: Mapping[Ref, ObjectID], 

1411 peeled_refs: Mapping[Ref, ObjectID] | None = None, 

1412) -> None: 

1413 """Write a packed refs file. 

1414 

1415 Args: 

1416 f: empty file-like object to write to 

1417 packed_refs: dict of refname to sha of packed refs to write 

1418 peeled_refs: dict of refname to peeled value of sha 

1419 """ 

1420 if peeled_refs is None: 

1421 peeled_refs = {} 

1422 else: 

1423 f.write(b"# pack-refs with: peeled\n") 

1424 for refname in sorted(packed_refs.keys()): 

1425 f.write(git_line(packed_refs[refname], refname)) 

1426 if refname in peeled_refs: 

1427 f.write(b"^" + peeled_refs[refname] + b"\n") 

1428 

1429 

1430def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]: 

1431 """Read info/refs file. 

1432 

1433 Args: 

1434 f: File-like object to read from 

1435 

1436 Returns: 

1437 Dictionary mapping ref names to SHA1s 

1438 """ 

1439 ret: dict[Ref, ObjectID] = {} 

1440 for line_no, line in enumerate(f.readlines(), 1): 

1441 stripped = line.rstrip(b"\r\n") 

1442 parts = stripped.split(b"\t", 1) 

1443 if len(parts) != 2: 

1444 raise ValueError( 

1445 f"Invalid info/refs format at line {line_no}: " 

1446 f"expected '<sha>\\t<refname>', got {stripped[:100]!r}" 

1447 ) 

1448 (sha, name) = parts 

1449 ret[Ref(name)] = ObjectID(sha) 

1450 return ret 

1451 

1452 

1453def is_local_branch(x: bytes) -> bool: 

1454 """Check if a ref name is a local branch.""" 

1455 return x.startswith(LOCAL_BRANCH_PREFIX) 

1456 

1457 

1458def local_branch_name(name: bytes) -> Ref: 

1459 """Build a full branch ref from a short name. 

1460 

1461 Args: 

1462 name: Short branch name (e.g., b"master") or full ref 

1463 

1464 Returns: 

1465 Full branch ref name (e.g., b"refs/heads/master") 

1466 

1467 Examples: 

1468 >>> local_branch_name(b"master") 

1469 b'refs/heads/master' 

1470 >>> local_branch_name(b"refs/heads/master") 

1471 b'refs/heads/master' 

1472 """ 

1473 if name.startswith(LOCAL_BRANCH_PREFIX): 

1474 return Ref(name) 

1475 return Ref(LOCAL_BRANCH_PREFIX + name) 

1476 

1477 

1478def local_tag_name(name: bytes) -> Ref: 

1479 """Build a full tag ref from a short name. 

1480 

1481 Args: 

1482 name: Short tag name (e.g., b"v1.0") or full ref 

1483 

1484 Returns: 

1485 Full tag ref name (e.g., b"refs/tags/v1.0") 

1486 

1487 Examples: 

1488 >>> local_tag_name(b"v1.0") 

1489 b'refs/tags/v1.0' 

1490 >>> local_tag_name(b"refs/tags/v1.0") 

1491 b'refs/tags/v1.0' 

1492 """ 

1493 if name.startswith(LOCAL_TAG_PREFIX): 

1494 return Ref(name) 

1495 return Ref(LOCAL_TAG_PREFIX + name) 

1496 

1497 

1498def local_replace_name(name: bytes) -> Ref: 

1499 """Build a full replace ref from a short name. 

1500 

1501 Args: 

1502 name: Short replace name (object SHA) or full ref 

1503 

1504 Returns: 

1505 Full replace ref name (e.g., b"refs/replace/<sha>") 

1506 

1507 Examples: 

1508 >>> local_replace_name(b"abc123") 

1509 b'refs/replace/abc123' 

1510 >>> local_replace_name(b"refs/replace/abc123") 

1511 b'refs/replace/abc123' 

1512 """ 

1513 if name.startswith(LOCAL_REPLACE_PREFIX): 

1514 return Ref(name) 

1515 return Ref(LOCAL_REPLACE_PREFIX + name) 

1516 

1517 

1518def extract_branch_name(ref: bytes) -> bytes: 

1519 """Extract branch name from a full branch ref. 

1520 

1521 Args: 

1522 ref: Full branch ref (e.g., b"refs/heads/master") 

1523 

1524 Returns: 

1525 Short branch name (e.g., b"master") 

1526 

1527 Raises: 

1528 ValueError: If ref is not a local branch 

1529 

1530 Examples: 

1531 >>> extract_branch_name(b"refs/heads/master") 

1532 b'master' 

1533 >>> extract_branch_name(b"refs/heads/feature/foo") 

1534 b'feature/foo' 

1535 """ 

1536 if not ref.startswith(LOCAL_BRANCH_PREFIX): 

1537 raise ValueError(f"Not a local branch ref: {ref!r}") 

1538 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1539 

1540 

1541def extract_tag_name(ref: bytes) -> bytes: 

1542 """Extract tag name from a full tag ref. 

1543 

1544 Args: 

1545 ref: Full tag ref (e.g., b"refs/tags/v1.0") 

1546 

1547 Returns: 

1548 Short tag name (e.g., b"v1.0") 

1549 

1550 Raises: 

1551 ValueError: If ref is not a local tag 

1552 

1553 Examples: 

1554 >>> extract_tag_name(b"refs/tags/v1.0") 

1555 b'v1.0' 

1556 """ 

1557 if not ref.startswith(LOCAL_TAG_PREFIX): 

1558 raise ValueError(f"Not a local tag ref: {ref!r}") 

1559 return ref[len(LOCAL_TAG_PREFIX) :] 

1560 

1561 

1562def shorten_ref_name(ref: bytes) -> bytes: 

1563 """Convert a full ref name to its short form. 

1564 

1565 Args: 

1566 ref: Full ref name (e.g., b"refs/heads/master") 

1567 

1568 Returns: 

1569 Short ref name (e.g., b"master") 

1570 

1571 Examples: 

1572 >>> shorten_ref_name(b"refs/heads/master") 

1573 b'master' 

1574 >>> shorten_ref_name(b"refs/remotes/origin/main") 

1575 b'origin/main' 

1576 >>> shorten_ref_name(b"refs/tags/v1.0") 

1577 b'v1.0' 

1578 >>> shorten_ref_name(b"HEAD") 

1579 b'HEAD' 

1580 """ 

1581 if ref.startswith(LOCAL_BRANCH_PREFIX): 

1582 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1583 elif ref.startswith(LOCAL_REMOTE_PREFIX): 

1584 return ref[len(LOCAL_REMOTE_PREFIX) :] 

1585 elif ref.startswith(LOCAL_TAG_PREFIX): 

1586 return ref[len(LOCAL_TAG_PREFIX) :] 

1587 return ref 

1588 

1589 

1590def _set_origin_head( 

1591 refs: RefsContainer, origin: bytes, origin_head: bytes | None 

1592) -> None: 

1593 # set refs/remotes/origin/HEAD 

1594 origin_base = b"refs/remotes/" + origin + b"/" 

1595 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1596 origin_ref = Ref(origin_base + HEADREF) 

1597 target_ref = Ref(origin_base + extract_branch_name(origin_head)) 

1598 if target_ref in refs: 

1599 refs.set_symbolic_ref(origin_ref, target_ref) 

1600 

1601 

1602def _set_default_branch( 

1603 refs: RefsContainer, 

1604 origin: bytes, 

1605 origin_head: bytes | None, 

1606 branch: bytes | None, 

1607 ref_message: bytes | None, 

1608) -> bytes: 

1609 """Set the default branch.""" 

1610 origin_base = b"refs/remotes/" + origin + b"/" 

1611 if branch: 

1612 origin_ref = Ref(origin_base + branch) 

1613 if origin_ref in refs: 

1614 local_ref = Ref(local_branch_name(branch)) 

1615 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1616 head_ref = local_ref 

1617 elif Ref(local_tag_name(branch)) in refs: 

1618 head_ref = Ref(local_tag_name(branch)) 

1619 else: 

1620 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1621 elif origin_head: 

1622 head_ref = Ref(origin_head) 

1623 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1624 origin_ref = Ref(origin_base + extract_branch_name(origin_head)) 

1625 else: 

1626 origin_ref = Ref(origin_head) 

1627 try: 

1628 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1629 except KeyError: 

1630 pass 

1631 else: 

1632 raise ValueError("neither origin_head nor branch are provided") 

1633 return head_ref 

1634 

1635 

1636def _set_head( 

1637 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None 

1638) -> ObjectID | None: 

1639 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1640 # detach HEAD at specified tag 

1641 head = refs[Ref(head_ref)] 

1642 if isinstance(head, Tag): 

1643 _cls, obj = head.object 

1644 head = obj.get_object(obj).id 

1645 del refs[HEADREF] 

1646 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1647 else: 

1648 # set HEAD to specific branch 

1649 try: 

1650 head = refs[Ref(head_ref)] 

1651 refs.set_symbolic_ref(HEADREF, Ref(head_ref)) 

1652 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1653 except KeyError: 

1654 head = None 

1655 return head 

1656 

1657 

1658def _import_remote_refs( 

1659 refs_container: RefsContainer, 

1660 remote_name: str, 

1661 refs: Mapping[Ref, ObjectID | None], 

1662 message: bytes | None = None, 

1663 prune: bool = False, 

1664 prune_tags: bool = False, 

1665) -> None: 

1666 from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs 

1667 

1668 stripped_refs = strip_peeled_refs(refs) 

1669 branches: dict[Ref, ObjectID | None] = { 

1670 Ref(extract_branch_name(n)): v 

1671 for (n, v) in stripped_refs.items() 

1672 if n.startswith(LOCAL_BRANCH_PREFIX) 

1673 } 

1674 refs_container.import_refs( 

1675 Ref(b"refs/remotes/" + remote_name.encode()), 

1676 branches, 

1677 message=message, 

1678 prune=prune, 

1679 ) 

1680 tags: dict[Ref, ObjectID | None] = { 

1681 Ref(extract_tag_name(n)): v 

1682 for (n, v) in stripped_refs.items() 

1683 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1684 } 

1685 refs_container.import_refs( 

1686 Ref(LOCAL_TAG_PREFIX), tags, message=message, prune=prune_tags 

1687 ) 

1688 

1689 

1690class locked_ref: 

1691 """Lock a ref while making modifications. 

1692 

1693 Works as a context manager. 

1694 """ 

1695 

1696 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1697 """Initialize a locked ref. 

1698 

1699 Args: 

1700 refs_container: The DiskRefsContainer to lock the ref in 

1701 refname: The ref name to lock 

1702 """ 

1703 self._refs_container = refs_container 

1704 self._refname = refname 

1705 self._file: _GitFile | None = None 

1706 self._realname: Ref | None = None 

1707 self._deleted = False 

1708 

1709 def __enter__(self) -> "locked_ref": 

1710 """Enter the context manager and acquire the lock. 

1711 

1712 Returns: 

1713 This locked_ref instance 

1714 

1715 Raises: 

1716 OSError: If the lock cannot be acquired 

1717 """ 

1718 self._refs_container._check_refname(self._refname) 

1719 try: 

1720 realnames, _ = self._refs_container.follow(self._refname) 

1721 self._realname = realnames[-1] 

1722 except (KeyError, IndexError, SymrefLoop): 

1723 self._realname = self._refname 

1724 

1725 filename = self._refs_container.refpath(self._realname) 

1726 ensure_dir_exists(os.path.dirname(filename)) 

1727 f = GitFile(filename, "wb") 

1728 self._file = f 

1729 return self 

1730 

1731 def __exit__( 

1732 self, 

1733 exc_type: type | None, 

1734 exc_value: BaseException | None, 

1735 traceback: types.TracebackType | None, 

1736 ) -> None: 

1737 """Exit the context manager and release the lock. 

1738 

1739 Args: 

1740 exc_type: Type of exception if one occurred 

1741 exc_value: Exception instance if one occurred 

1742 traceback: Traceback if an exception occurred 

1743 """ 

1744 if self._file: 

1745 if exc_type is not None or self._deleted: 

1746 self._file.abort() 

1747 else: 

1748 self._file.close() 

1749 

1750 def get(self) -> bytes | None: 

1751 """Get the current value of the ref.""" 

1752 if not self._file: 

1753 raise RuntimeError("locked_ref not in context") 

1754 

1755 assert self._realname is not None 

1756 current_ref = self._refs_container.read_loose_ref(self._realname) 

1757 if current_ref is None: 

1758 current_ref = self._refs_container.get_packed_refs().get( 

1759 self._realname, None 

1760 ) 

1761 return current_ref 

1762 

1763 def ensure_equals(self, expected_value: bytes | None) -> bool: 

1764 """Ensure the ref currently equals the expected value. 

1765 

1766 Args: 

1767 expected_value: The expected current value of the ref 

1768 Returns: 

1769 True if the ref equals the expected value, False otherwise 

1770 """ 

1771 current_value = self.get() 

1772 return current_value == expected_value 

1773 

1774 def set(self, new_ref: bytes) -> None: 

1775 """Set the ref to a new value. 

1776 

1777 Args: 

1778 new_ref: The new SHA1 or symbolic ref value 

1779 """ 

1780 if not self._file: 

1781 raise RuntimeError("locked_ref not in context") 

1782 

1783 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1784 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1785 

1786 self._file.seek(0) 

1787 self._file.truncate() 

1788 self._file.write(new_ref + b"\n") 

1789 self._deleted = False 

1790 

1791 def set_symbolic_ref(self, target: Ref) -> None: 

1792 """Make this ref point at another ref. 

1793 

1794 Args: 

1795 target: Name of the ref to point at 

1796 """ 

1797 if not self._file: 

1798 raise RuntimeError("locked_ref not in context") 

1799 

1800 self._refs_container._check_refname(target) 

1801 self._file.seek(0) 

1802 self._file.truncate() 

1803 self._file.write(SYMREF + target + b"\n") 

1804 self._deleted = False 

1805 

1806 def delete(self) -> None: 

1807 """Delete the ref file while holding the lock.""" 

1808 if not self._file: 

1809 raise RuntimeError("locked_ref not in context") 

1810 

1811 # Delete the actual ref file while holding the lock 

1812 if self._realname: 

1813 filename = self._refs_container.refpath(self._realname) 

1814 try: 

1815 if os.path.lexists(filename): 

1816 os.remove(filename) 

1817 except FileNotFoundError: 

1818 pass 

1819 self._refs_container._remove_packed_ref(self._realname) 

1820 

1821 self._deleted = True 

1822 

1823 

1824class NamespacedRefsContainer(RefsContainer): 

1825 """Wrapper that adds namespace prefix to all ref operations. 

1826 

1827 This implements Git's GIT_NAMESPACE feature, which stores refs under 

1828 refs/namespaces/<namespace>/ and filters operations to only show refs 

1829 within that namespace. 

1830 

1831 Example: 

1832 With namespace "foo", a ref "refs/heads/master" is stored as 

1833 "refs/namespaces/foo/refs/heads/master" in the underlying container. 

1834 """ 

1835 

1836 def __init__(self, refs: RefsContainer, namespace: bytes) -> None: 

1837 """Initialize NamespacedRefsContainer. 

1838 

1839 Args: 

1840 refs: The underlying refs container to wrap 

1841 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar") 

1842 """ 

1843 super().__init__(logger=refs._logger) 

1844 self._refs = refs 

1845 # Build namespace prefix: refs/namespaces/<namespace>/ 

1846 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/ 

1847 namespace_parts = namespace.split(b"/") 

1848 self._namespace_prefix = b"" 

1849 for part in namespace_parts: 

1850 self._namespace_prefix += b"refs/namespaces/" + part + b"/" 

1851 

1852 def _apply_namespace(self, name: bytes) -> bytes: 

1853 """Apply namespace prefix to a ref name.""" 

1854 # HEAD and other special refs are not namespaced 

1855 if name == HEADREF or not name.startswith(b"refs/"): 

1856 return name 

1857 return self._namespace_prefix + name 

1858 

1859 def _strip_namespace(self, name: bytes) -> bytes | None: 

1860 """Remove namespace prefix from a ref name. 

1861 

1862 Returns None if the ref is not in our namespace. 

1863 """ 

1864 # HEAD and other special refs are not namespaced 

1865 if name == HEADREF or not name.startswith(b"refs/"): 

1866 return name 

1867 if name.startswith(self._namespace_prefix): 

1868 return name[len(self._namespace_prefix) :] 

1869 return None 

1870 

1871 def allkeys(self) -> set[Ref]: 

1872 """Return all reference keys in this namespace.""" 

1873 keys: set[Ref] = set() 

1874 for key in self._refs.allkeys(): 

1875 stripped = self._strip_namespace(key) 

1876 if stripped is not None: 

1877 keys.add(Ref(stripped)) 

1878 return keys 

1879 

1880 def read_loose_ref(self, name: Ref) -> bytes | None: 

1881 """Read a loose reference.""" 

1882 return self._refs.read_loose_ref(Ref(self._apply_namespace(name))) 

1883 

1884 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

1885 """Get packed refs within this namespace.""" 

1886 packed: dict[Ref, ObjectID] = {} 

1887 for name, value in self._refs.get_packed_refs().items(): 

1888 stripped = self._strip_namespace(name) 

1889 if stripped is not None: 

1890 packed[Ref(stripped)] = value 

1891 return packed 

1892 

1893 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

1894 """Add packed refs with namespace prefix.""" 

1895 namespaced_refs: dict[Ref, ObjectID | None] = { 

1896 Ref(self._apply_namespace(name)): value for name, value in new_refs.items() 

1897 } 

1898 self._refs.add_packed_refs(namespaced_refs) 

1899 

1900 def get_peeled(self, name: Ref) -> ObjectID | None: 

1901 """Return the cached peeled value of a ref.""" 

1902 return self._refs.get_peeled(Ref(self._apply_namespace(name))) 

1903 

1904 def set_symbolic_ref( 

1905 self, 

1906 name: Ref, 

1907 other: Ref, 

1908 committer: bytes | None = None, 

1909 timestamp: int | None = None, 

1910 timezone: int | None = None, 

1911 message: bytes | None = None, 

1912 ) -> None: 

1913 """Make a ref point at another ref.""" 

1914 self._refs.set_symbolic_ref( 

1915 Ref(self._apply_namespace(name)), 

1916 Ref(self._apply_namespace(other)), 

1917 committer=committer, 

1918 timestamp=timestamp, 

1919 timezone=timezone, 

1920 message=message, 

1921 ) 

1922 

1923 def set_if_equals( 

1924 self, 

1925 name: Ref, 

1926 old_ref: ObjectID | None, 

1927 new_ref: ObjectID, 

1928 committer: bytes | None = None, 

1929 timestamp: int | None = None, 

1930 timezone: int | None = None, 

1931 message: bytes | None = None, 

1932 ) -> bool: 

1933 """Set a refname to new_ref only if it currently equals old_ref.""" 

1934 return self._refs.set_if_equals( 

1935 Ref(self._apply_namespace(name)), 

1936 old_ref, 

1937 new_ref, 

1938 committer=committer, 

1939 timestamp=timestamp, 

1940 timezone=timezone, 

1941 message=message, 

1942 ) 

1943 

1944 def add_if_new( 

1945 self, 

1946 name: Ref, 

1947 ref: ObjectID, 

1948 committer: bytes | None = None, 

1949 timestamp: int | None = None, 

1950 timezone: int | None = None, 

1951 message: bytes | None = None, 

1952 ) -> bool: 

1953 """Add a new reference only if it does not already exist.""" 

1954 return self._refs.add_if_new( 

1955 Ref(self._apply_namespace(name)), 

1956 ref, 

1957 committer=committer, 

1958 timestamp=timestamp, 

1959 timezone=timezone, 

1960 message=message, 

1961 ) 

1962 

1963 def remove_if_equals( 

1964 self, 

1965 name: Ref, 

1966 old_ref: ObjectID | None, 

1967 committer: bytes | None = None, 

1968 timestamp: int | None = None, 

1969 timezone: int | None = None, 

1970 message: bytes | None = None, 

1971 ) -> bool: 

1972 """Remove a refname only if it currently equals old_ref.""" 

1973 return self._refs.remove_if_equals( 

1974 Ref(self._apply_namespace(name)), 

1975 old_ref, 

1976 committer=committer, 

1977 timestamp=timestamp, 

1978 timezone=timezone, 

1979 message=message, 

1980 ) 

1981 

1982 def pack_refs(self, all: bool = False) -> None: 

1983 """Pack loose refs into packed-refs file. 

1984 

1985 Note: This packs all refs in the underlying container, not just 

1986 those in the namespace. 

1987 """ 

1988 self._refs.pack_refs(all=all) 

1989 

1990 

1991def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

1992 """Filter refs to only include those with a given prefix. 

1993 

1994 Args: 

1995 refs: A dictionary of refs. 

1996 prefixes: The prefixes to filter by. 

1997 """ 

1998 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

1999 return filtered 

2000 

2001 

2002def is_per_worktree_ref(ref: bytes) -> bool: 

2003 """Returns whether a reference is stored per worktree or not. 

2004 

2005 Per-worktree references are: 

2006 - all pseudorefs, e.g. HEAD 

2007 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

2008 

2009 All refs starting with "refs/" are shared, except for the ones listed above. 

2010 

2011 See https://git-scm.com/docs/git-worktree#_refs. 

2012 """ 

2013 return not ref.startswith(b"refs/") or ref.startswith( 

2014 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

2015 )