Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

750 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25__all__ = [ 

26 "HEADREF", 

27 "LOCAL_BRANCH_PREFIX", 

28 "LOCAL_NOTES_PREFIX", 

29 "LOCAL_REMOTE_PREFIX", 

30 "LOCAL_REPLACE_PREFIX", 

31 "LOCAL_TAG_PREFIX", 

32 "SYMREF", 

33 "DictRefsContainer", 

34 "DiskRefsContainer", 

35 "NamespacedRefsContainer", 

36 "Ref", 

37 "RefsContainer", 

38 "SymrefLoop", 

39 "check_ref_format", 

40 "extract_branch_name", 

41 "extract_tag_name", 

42 "filter_ref_prefix", 

43 "is_local_branch", 

44 "is_per_worktree_ref", 

45 "local_branch_name", 

46 "local_replace_name", 

47 "local_tag_name", 

48 "parse_remote_ref", 

49 "parse_symref_value", 

50 "read_info_refs", 

51 "read_packed_refs", 

52 "read_packed_refs_with_peeled", 

53 "set_ref_from_raw", 

54 "shorten_ref_name", 

55 "write_packed_refs", 

56] 

57 

58import os 

59import types 

60from collections.abc import Callable, Iterable, Iterator, Mapping 

61from contextlib import suppress 

62from typing import ( 

63 IO, 

64 TYPE_CHECKING, 

65 Any, 

66 BinaryIO, 

67 NewType, 

68 TypeVar, 

69) 

70 

71if TYPE_CHECKING: 

72 from .file import _GitFile 

73 

74from .errors import PackedRefsException, RefFormatError 

75from .file import GitFile, ensure_dir_exists 

76from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

77 

78Ref = NewType("Ref", bytes) 

79 

80T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None]) 

81 

82HEADREF = Ref(b"HEAD") 

83SYMREF = b"ref: " 

84LOCAL_BRANCH_PREFIX = b"refs/heads/" 

85LOCAL_TAG_PREFIX = b"refs/tags/" 

86LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

87LOCAL_NOTES_PREFIX = b"refs/notes/" 

88LOCAL_REPLACE_PREFIX = b"refs/replace/" 

89BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[") 

90 

91 

92class SymrefLoop(Exception): 

93 """There is a loop between one or more symrefs.""" 

94 

95 def __init__(self, ref: bytes, depth: int) -> None: 

96 """Initialize SymrefLoop exception.""" 

97 self.ref = ref 

98 self.depth = depth 

99 

100 

101def parse_symref_value(contents: bytes) -> bytes: 

102 """Parse a symref value. 

103 

104 Args: 

105 contents: Contents to parse 

106 Returns: Destination 

107 """ 

108 if contents.startswith(SYMREF): 

109 return contents[len(SYMREF) :].rstrip(b"\r\n") 

110 raise ValueError(contents) 

111 

112 

113def check_ref_format(refname: Ref) -> bool: 

114 """Check if a refname is correctly formatted. 

115 

116 Implements all the same rules as git-check-ref-format[1]. 

117 

118 [1] 

119 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

120 

121 Args: 

122 refname: The refname to check 

123 Returns: True if refname is valid, False otherwise 

124 """ 

125 # These could be combined into one big expression, but are listed 

126 # separately to parallel [1]. 

127 if b"/." in refname or refname.startswith(b"."): # type: ignore[comparison-overlap] 

128 return False 

129 if b"/" not in refname: # type: ignore[comparison-overlap] 

130 return False 

131 if b".." in refname: # type: ignore[comparison-overlap] 

132 return False 

133 for i, c in enumerate(refname): 

134 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

135 return False 

136 if refname[-1] in b"/.": 

137 return False 

138 if refname.endswith(b".lock"): 

139 return False 

140 if b"@{" in refname: # type: ignore[comparison-overlap] 

141 return False 

142 if b"\\" in refname: # type: ignore[comparison-overlap] 

143 return False 

144 return True 

145 

146 

147def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

148 """Parse a remote ref into remote name and branch name. 

149 

150 Args: 

151 ref: Remote ref like b"refs/remotes/origin/main" 

152 

153 Returns: 

154 Tuple of (remote_name, branch_name) 

155 

156 Raises: 

157 ValueError: If ref is not a valid remote ref 

158 """ 

159 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

160 raise ValueError(f"Not a remote ref: {ref!r}") 

161 

162 # Remove the prefix 

163 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

164 

165 # Split into remote name and branch name 

166 parts = remainder.split(b"/", 1) 

167 if len(parts) != 2: 

168 raise ValueError(f"Invalid remote ref format: {ref!r}") 

169 

170 remote_name, branch_name = parts 

171 return (remote_name, branch_name) 

172 

173 

174def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None: 

175 """Set a reference from a raw ref value. 

176 

177 This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs. 

178 

179 Args: 

180 refs: The RefsContainer to set the ref in 

181 name: The ref name to set 

182 raw_ref: The raw ref value (either a symbolic ref or an ObjectID) 

183 """ 

184 if raw_ref.startswith(SYMREF): 

185 # It's a symbolic ref 

186 target = Ref(raw_ref[len(SYMREF) :]) 

187 refs.set_symbolic_ref(name, target) 

188 else: 

189 # It's a direct ObjectID 

190 refs[name] = ObjectID(raw_ref) 

191 

192 

193class RefsContainer: 

194 """A container for refs.""" 

195 

196 def __init__( 

197 self, 

198 logger: Callable[ 

199 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

200 ] 

201 | None = None, 

202 ) -> None: 

203 """Initialize RefsContainer with optional logger function.""" 

204 self._logger = logger 

205 

206 def _log( 

207 self, 

208 ref: bytes, 

209 old_sha: bytes | None, 

210 new_sha: bytes | None, 

211 committer: bytes | None = None, 

212 timestamp: int | None = None, 

213 timezone: int | None = None, 

214 message: bytes | None = None, 

215 ) -> None: 

216 if self._logger is None: 

217 return 

218 if message is None: 

219 return 

220 # Use ZERO_SHA for None values, matching git behavior 

221 if old_sha is None: 

222 old_sha = ZERO_SHA 

223 if new_sha is None: 

224 new_sha = ZERO_SHA 

225 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

226 

227 def set_symbolic_ref( 

228 self, 

229 name: Ref, 

230 other: Ref, 

231 committer: bytes | None = None, 

232 timestamp: int | None = None, 

233 timezone: int | None = None, 

234 message: bytes | None = None, 

235 ) -> None: 

236 """Make a ref point at another ref. 

237 

238 Args: 

239 name: Name of the ref to set 

240 other: Name of the ref to point at 

241 committer: Optional committer name/email 

242 timestamp: Optional timestamp 

243 timezone: Optional timezone 

244 message: Optional message 

245 """ 

246 raise NotImplementedError(self.set_symbolic_ref) 

247 

248 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

249 """Get contents of the packed-refs file. 

250 

251 Returns: Dictionary mapping ref names to SHA1s 

252 

253 Note: Will return an empty dictionary when no packed-refs file is 

254 present. 

255 """ 

256 raise NotImplementedError(self.get_packed_refs) 

257 

258 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

259 """Add the given refs as packed refs. 

260 

261 Args: 

262 new_refs: A mapping of ref names to targets; if a target is None that 

263 means remove the ref 

264 """ 

265 raise NotImplementedError(self.add_packed_refs) 

266 

267 def get_peeled(self, name: Ref) -> ObjectID | None: 

268 """Return the cached peeled value of a ref, if available. 

269 

270 Args: 

271 name: Name of the ref to peel 

272 Returns: The peeled value of the ref. If the ref is known not point to 

273 a tag, this will be the SHA the ref refers to. If the ref may point 

274 to a tag, but no cached information is available, None is returned. 

275 """ 

276 return None 

277 

278 def import_refs( 

279 self, 

280 base: Ref, 

281 other: Mapping[Ref, ObjectID | None], 

282 committer: bytes | None = None, 

283 timestamp: bytes | None = None, 

284 timezone: bytes | None = None, 

285 message: bytes | None = None, 

286 prune: bool = False, 

287 ) -> None: 

288 """Import refs from another repository. 

289 

290 Args: 

291 base: Base ref to import into (e.g., b'refs/remotes/origin') 

292 other: Dictionary of refs to import 

293 committer: Optional committer for reflog 

294 timestamp: Optional timestamp for reflog 

295 timezone: Optional timezone for reflog 

296 message: Optional message for reflog 

297 prune: If True, remove refs not in other 

298 """ 

299 if prune: 

300 to_delete = set(self.subkeys(base)) 

301 else: 

302 to_delete = set() 

303 for name, value in other.items(): 

304 if value is None: 

305 to_delete.add(name) 

306 else: 

307 self.set_if_equals( 

308 Ref(b"/".join((base, name))), None, value, message=message 

309 ) 

310 if to_delete: 

311 try: 

312 to_delete.remove(name) 

313 except KeyError: 

314 pass 

315 for ref in to_delete: 

316 self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message) 

317 

318 def allkeys(self) -> set[Ref]: 

319 """All refs present in this container.""" 

320 raise NotImplementedError(self.allkeys) 

321 

322 def __iter__(self) -> Iterator[Ref]: 

323 """Iterate over all reference keys.""" 

324 return iter(self.allkeys()) 

325 

326 def keys(self, base: Ref | None = None) -> set[Ref]: 

327 """Refs present in this container. 

328 

329 Args: 

330 base: An optional base to return refs under. 

331 Returns: An unsorted set of valid refs in this container, including 

332 packed refs. 

333 """ 

334 if base is not None: 

335 return self.subkeys(base) 

336 else: 

337 return self.allkeys() 

338 

339 def subkeys(self, base: Ref) -> set[Ref]: 

340 """Refs present in this container under a base. 

341 

342 Args: 

343 base: The base to return refs under. 

344 Returns: A set of valid refs in this container under the base; the base 

345 prefix is stripped from the ref names returned. 

346 """ 

347 keys: set[Ref] = set() 

348 base_len = len(base) + 1 

349 for refname in self.allkeys(): 

350 if refname.startswith(base): 

351 keys.add(Ref(refname[base_len:])) 

352 return keys 

353 

354 def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]: 

355 """Return the contents of this container as a dictionary.""" 

356 ret: dict[Ref, ObjectID] = {} 

357 keys = self.keys(base) 

358 base_bytes: bytes 

359 if base is None: 

360 base_bytes = b"" 

361 else: 

362 base_bytes = base.rstrip(b"/") 

363 for key in keys: 

364 try: 

365 ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))] 

366 except (SymrefLoop, KeyError): 

367 continue # Unable to resolve 

368 

369 return ret 

370 

371 def _check_refname(self, name: Ref) -> None: 

372 """Ensure a refname is valid and lives in refs or is HEAD. 

373 

374 HEAD is not a valid refname according to git-check-ref-format, but this 

375 class needs to be able to touch HEAD. Also, check_ref_format expects 

376 refnames without the leading 'refs/', but this class requires that 

377 so it cannot touch anything outside the refs dir (or HEAD). 

378 

379 Args: 

380 name: The name of the reference. 

381 

382 Raises: 

383 KeyError: if a refname is not HEAD or is otherwise not valid. 

384 """ 

385 if name in (HEADREF, Ref(b"refs/stash")): 

386 return 

387 if not name.startswith(b"refs/") or not check_ref_format(Ref(name[5:])): 

388 raise RefFormatError(name) 

389 

390 def read_ref(self, refname: Ref) -> bytes | None: 

391 """Read a reference without following any references. 

392 

393 Args: 

394 refname: The name of the reference 

395 Returns: The contents of the ref file, or None if it does 

396 not exist. 

397 """ 

398 contents = self.read_loose_ref(refname) 

399 if not contents: 

400 contents = self.get_packed_refs().get(refname, None) 

401 return contents 

402 

403 def read_loose_ref(self, name: Ref) -> bytes | None: 

404 """Read a loose reference and return its contents. 

405 

406 Args: 

407 name: the refname to read 

408 Returns: The contents of the ref file, or None if it does 

409 not exist. 

410 """ 

411 raise NotImplementedError(self.read_loose_ref) 

412 

413 def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]: 

414 """Follow a reference name. 

415 

416 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

417 references in the chain 

418 """ 

419 contents: bytes | None = SYMREF + name 

420 depth = 0 

421 refnames: list[Ref] = [] 

422 while contents and contents.startswith(SYMREF): 

423 refname = Ref(contents[len(SYMREF) :]) 

424 refnames.append(refname) 

425 contents = self.read_ref(refname) 

426 if not contents: 

427 break 

428 depth += 1 

429 if depth > 5: 

430 raise SymrefLoop(name, depth) 

431 return refnames, ObjectID(contents) if contents else None 

432 

433 def __contains__(self, refname: Ref) -> bool: 

434 """Check if a reference exists.""" 

435 if self.read_ref(refname): 

436 return True 

437 return False 

438 

439 def __getitem__(self, name: Ref) -> ObjectID: 

440 """Get the SHA1 for a reference name. 

441 

442 This method follows all symbolic references. 

443 """ 

444 _, sha = self.follow(name) 

445 if sha is None: 

446 raise KeyError(name) 

447 return sha 

448 

449 def set_if_equals( 

450 self, 

451 name: Ref, 

452 old_ref: ObjectID | None, 

453 new_ref: ObjectID, 

454 committer: bytes | None = None, 

455 timestamp: int | None = None, 

456 timezone: int | None = None, 

457 message: bytes | None = None, 

458 ) -> bool: 

459 """Set a refname to new_ref only if it currently equals old_ref. 

460 

461 This method follows all symbolic references if applicable for the 

462 subclass, and can be used to perform an atomic compare-and-swap 

463 operation. 

464 

465 Args: 

466 name: The refname to set. 

467 old_ref: The old sha the refname must refer to, or None to set 

468 unconditionally. 

469 new_ref: The new sha the refname will refer to. 

470 committer: Optional committer name/email 

471 timestamp: Optional timestamp 

472 timezone: Optional timezone 

473 message: Message for reflog 

474 Returns: True if the set was successful, False otherwise. 

475 """ 

476 raise NotImplementedError(self.set_if_equals) 

477 

478 def add_if_new( 

479 self, 

480 name: Ref, 

481 ref: ObjectID, 

482 committer: bytes | None = None, 

483 timestamp: int | None = None, 

484 timezone: int | None = None, 

485 message: bytes | None = None, 

486 ) -> bool: 

487 """Add a new reference only if it does not already exist. 

488 

489 Args: 

490 name: Ref name 

491 ref: Ref value 

492 committer: Optional committer name/email 

493 timestamp: Optional timestamp 

494 timezone: Optional timezone 

495 message: Optional message for reflog 

496 """ 

497 raise NotImplementedError(self.add_if_new) 

498 

499 def __setitem__(self, name: Ref, ref: ObjectID) -> None: 

500 """Set a reference name to point to the given SHA1. 

501 

502 This method follows all symbolic references if applicable for the 

503 subclass. 

504 

505 Note: This method unconditionally overwrites the contents of a 

506 reference. To update atomically only if the reference has not 

507 changed, use set_if_equals(). 

508 

509 Args: 

510 name: The refname to set. 

511 ref: The new sha the refname will refer to. 

512 """ 

513 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

514 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

515 self.set_if_equals(name, None, ref) 

516 

517 def remove_if_equals( 

518 self, 

519 name: Ref, 

520 old_ref: ObjectID | None, 

521 committer: bytes | None = None, 

522 timestamp: int | None = None, 

523 timezone: int | None = None, 

524 message: bytes | None = None, 

525 ) -> bool: 

526 """Remove a refname only if it currently equals old_ref. 

527 

528 This method does not follow symbolic references, even if applicable for 

529 the subclass. It can be used to perform an atomic compare-and-delete 

530 operation. 

531 

532 Args: 

533 name: The refname to delete. 

534 old_ref: The old sha the refname must refer to, or None to 

535 delete unconditionally. 

536 committer: Optional committer name/email 

537 timestamp: Optional timestamp 

538 timezone: Optional timezone 

539 message: Message for reflog 

540 Returns: True if the delete was successful, False otherwise. 

541 """ 

542 raise NotImplementedError(self.remove_if_equals) 

543 

544 def __delitem__(self, name: Ref) -> None: 

545 """Remove a refname. 

546 

547 This method does not follow symbolic references, even if applicable for 

548 the subclass. 

549 

550 Note: This method unconditionally deletes the contents of a reference. 

551 To delete atomically only if the reference has not changed, use 

552 remove_if_equals(). 

553 

554 Args: 

555 name: The refname to delete. 

556 """ 

557 self.remove_if_equals(name, None) 

558 

559 def get_symrefs(self) -> dict[Ref, Ref]: 

560 """Get a dict with all symrefs in this container. 

561 

562 Returns: Dictionary mapping source ref to target ref 

563 """ 

564 ret: dict[Ref, Ref] = {} 

565 for src in self.allkeys(): 

566 try: 

567 ref_value = self.read_ref(src) 

568 assert ref_value is not None 

569 dst = parse_symref_value(ref_value) 

570 except ValueError: 

571 pass 

572 else: 

573 ret[src] = Ref(dst) 

574 return ret 

575 

576 def pack_refs(self, all: bool = False) -> None: 

577 """Pack loose refs into packed-refs file. 

578 

579 Args: 

580 all: If True, pack all refs. If False, only pack tags. 

581 """ 

582 raise NotImplementedError(self.pack_refs) 

583 

584 

585class DictRefsContainer(RefsContainer): 

586 """RefsContainer backed by a simple dict. 

587 

588 This container does not support symbolic or packed references and is not 

589 threadsafe. 

590 """ 

591 

592 def __init__( 

593 self, 

594 refs: dict[Ref, bytes], 

595 logger: Callable[ 

596 [ 

597 bytes, 

598 bytes | None, 

599 bytes | None, 

600 bytes | None, 

601 int | None, 

602 int | None, 

603 bytes | None, 

604 ], 

605 None, 

606 ] 

607 | None = None, 

608 ) -> None: 

609 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

610 super().__init__(logger=logger) 

611 self._refs = refs 

612 self._peeled: dict[Ref, ObjectID] = {} 

613 self._watchers: set[Any] = set() 

614 

615 def allkeys(self) -> set[Ref]: 

616 """Return all reference keys.""" 

617 return set(self._refs.keys()) 

618 

619 def read_loose_ref(self, name: Ref) -> bytes | None: 

620 """Read a loose reference.""" 

621 return self._refs.get(name, None) 

622 

623 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

624 """Get packed references.""" 

625 return {} 

626 

627 def _notify(self, ref: bytes, newsha: bytes | None) -> None: 

628 for watcher in self._watchers: 

629 watcher._notify((ref, newsha)) 

630 

631 def set_symbolic_ref( 

632 self, 

633 name: Ref, 

634 other: Ref, 

635 committer: bytes | None = None, 

636 timestamp: int | None = None, 

637 timezone: int | None = None, 

638 message: bytes | None = None, 

639 ) -> None: 

640 """Make a ref point at another ref. 

641 

642 Args: 

643 name: Name of the ref to set 

644 other: Name of the ref to point at 

645 committer: Optional committer name for reflog 

646 timestamp: Optional timestamp for reflog 

647 timezone: Optional timezone for reflog 

648 message: Optional message for reflog 

649 """ 

650 old = self.follow(name)[-1] 

651 new = SYMREF + other 

652 self._refs[name] = new 

653 self._notify(name, new) 

654 self._log( 

655 name, 

656 old, 

657 new, 

658 committer=committer, 

659 timestamp=timestamp, 

660 timezone=timezone, 

661 message=message, 

662 ) 

663 

664 def set_if_equals( 

665 self, 

666 name: Ref, 

667 old_ref: ObjectID | None, 

668 new_ref: ObjectID, 

669 committer: bytes | None = None, 

670 timestamp: int | None = None, 

671 timezone: int | None = None, 

672 message: bytes | None = None, 

673 ) -> bool: 

674 """Set a refname to new_ref only if it currently equals old_ref. 

675 

676 This method follows all symbolic references, and can be used to perform 

677 an atomic compare-and-swap operation. 

678 

679 Args: 

680 name: The refname to set. 

681 old_ref: The old sha the refname must refer to, or None to set 

682 unconditionally. 

683 new_ref: The new sha the refname will refer to. 

684 committer: Optional committer name for reflog 

685 timestamp: Optional timestamp for reflog 

686 timezone: Optional timezone for reflog 

687 message: Optional message for reflog 

688 

689 Returns: 

690 True if the set was successful, False otherwise. 

691 """ 

692 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

693 return False 

694 # Only update the specific ref requested, not the whole chain 

695 self._check_refname(name) 

696 old = self._refs.get(name) 

697 self._refs[name] = new_ref 

698 self._notify(name, new_ref) 

699 self._log( 

700 name, 

701 old, 

702 new_ref, 

703 committer=committer, 

704 timestamp=timestamp, 

705 timezone=timezone, 

706 message=message, 

707 ) 

708 return True 

709 

710 def add_if_new( 

711 self, 

712 name: Ref, 

713 ref: ObjectID, 

714 committer: bytes | None = None, 

715 timestamp: int | None = None, 

716 timezone: int | None = None, 

717 message: bytes | None = None, 

718 ) -> bool: 

719 """Add a new reference only if it does not already exist. 

720 

721 Args: 

722 name: Ref name 

723 ref: Ref value 

724 committer: Optional committer name for reflog 

725 timestamp: Optional timestamp for reflog 

726 timezone: Optional timezone for reflog 

727 message: Optional message for reflog 

728 

729 Returns: 

730 True if the add was successful, False otherwise. 

731 """ 

732 if name in self._refs: 

733 return False 

734 self._refs[name] = ref 

735 self._notify(name, ref) 

736 self._log( 

737 name, 

738 None, 

739 ref, 

740 committer=committer, 

741 timestamp=timestamp, 

742 timezone=timezone, 

743 message=message, 

744 ) 

745 return True 

746 

747 def remove_if_equals( 

748 self, 

749 name: Ref, 

750 old_ref: ObjectID | None, 

751 committer: bytes | None = None, 

752 timestamp: int | None = None, 

753 timezone: int | None = None, 

754 message: bytes | None = None, 

755 ) -> bool: 

756 """Remove a refname only if it currently equals old_ref. 

757 

758 This method does not follow symbolic references. It can be used to 

759 perform an atomic compare-and-delete operation. 

760 

761 Args: 

762 name: The refname to delete. 

763 old_ref: The old sha the refname must refer to, or None to 

764 delete unconditionally. 

765 committer: Optional committer name for reflog 

766 timestamp: Optional timestamp for reflog 

767 timezone: Optional timezone for reflog 

768 message: Optional message for reflog 

769 

770 Returns: 

771 True if the delete was successful, False otherwise. 

772 """ 

773 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

774 return False 

775 try: 

776 old = self._refs.pop(name) 

777 except KeyError: 

778 pass 

779 else: 

780 self._notify(name, None) 

781 self._log( 

782 name, 

783 old, 

784 None, 

785 committer=committer, 

786 timestamp=timestamp, 

787 timezone=timezone, 

788 message=message, 

789 ) 

790 return True 

791 

792 def get_peeled(self, name: Ref) -> ObjectID | None: 

793 """Get peeled version of a reference.""" 

794 return self._peeled.get(name) 

795 

796 def _update(self, refs: Mapping[Ref, ObjectID]) -> None: 

797 """Update multiple refs; intended only for testing.""" 

798 # TODO(dborowitz): replace this with a public function that uses 

799 # set_if_equal. 

800 for ref, sha in refs.items(): 

801 self.set_if_equals(ref, None, sha) 

802 

803 def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None: 

804 """Update cached peeled refs; intended only for testing.""" 

805 self._peeled.update(peeled) 

806 

807 

808class DiskRefsContainer(RefsContainer): 

809 """Refs container that reads refs from disk.""" 

810 

811 def __init__( 

812 self, 

813 path: str | bytes | os.PathLike[str], 

814 worktree_path: str | bytes | os.PathLike[str] | None = None, 

815 logger: Callable[ 

816 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

817 ] 

818 | None = None, 

819 ) -> None: 

820 """Initialize DiskRefsContainer.""" 

821 super().__init__(logger=logger) 

822 # Convert path-like objects to strings, then to bytes for Git compatibility 

823 self.path = os.fsencode(os.fspath(path)) 

824 if worktree_path is None: 

825 self.worktree_path = self.path 

826 else: 

827 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

828 self._packed_refs: dict[Ref, ObjectID] | None = None 

829 self._peeled_refs: dict[Ref, ObjectID] | None = None 

830 

831 def __repr__(self) -> str: 

832 """Return string representation of DiskRefsContainer.""" 

833 return f"{self.__class__.__name__}({self.path!r})" 

834 

835 def _iter_dir( 

836 self, 

837 path: bytes, 

838 base: bytes, 

839 dir_filter: Callable[[bytes], bool] | None = None, 

840 ) -> Iterator[Ref]: 

841 refspath = os.path.join(path, base.rstrip(b"/")) 

842 prefix_len = len(os.path.join(path, b"")) 

843 

844 for root, dirs, files in os.walk(refspath): 

845 directory = root[prefix_len:] 

846 if os.path.sep != "/": 

847 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

848 if dir_filter is not None: 

849 dirs[:] = [ 

850 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

851 ] 

852 

853 for filename in files: 

854 refname = b"/".join([directory, filename]) 

855 if check_ref_format(Ref(refname)): 

856 yield Ref(refname) 

857 

858 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]: 

859 base = base.rstrip(b"/") + b"/" 

860 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = [] 

861 if base != b"refs/": 

862 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

863 search_paths.append((path, None)) 

864 elif self.worktree_path == self.path: 

865 # Iterate through all the refs from the main worktree 

866 search_paths.append((self.path, None)) 

867 else: 

868 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

869 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

870 # Iterate through all the per-worktree refs from the worktree's gitdir 

871 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

872 

873 for path, dir_filter in search_paths: 

874 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

875 

876 def subkeys(self, base: Ref) -> set[Ref]: 

877 """Return subkeys under a given base reference path.""" 

878 subkeys: set[Ref] = set() 

879 

880 for key in self._iter_loose_refs(base): 

881 if key.startswith(base): 

882 subkeys.add(Ref(key[len(base) :].strip(b"/"))) 

883 

884 for key in self.get_packed_refs(): 

885 if key.startswith(base): 

886 subkeys.add(Ref(key[len(base) :].strip(b"/"))) 

887 return subkeys 

888 

889 def allkeys(self) -> set[Ref]: 

890 """Return all reference keys.""" 

891 allkeys: set[Ref] = set() 

892 if os.path.exists(self.refpath(HEADREF)): 

893 allkeys.add(Ref(HEADREF)) 

894 

895 allkeys.update(self._iter_loose_refs()) 

896 allkeys.update(self.get_packed_refs()) 

897 return allkeys 

898 

899 def refpath(self, name: bytes) -> bytes: 

900 """Return the disk path of a ref.""" 

901 path = name 

902 if os.path.sep != "/": 

903 path = path.replace(b"/", os.fsencode(os.path.sep)) 

904 

905 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

906 return os.path.join(root_dir, path) 

907 

908 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

909 """Get contents of the packed-refs file. 

910 

911 Returns: Dictionary mapping ref names to SHA1s 

912 

913 Note: Will return an empty dictionary when no packed-refs file is 

914 present. 

915 """ 

916 # TODO: invalidate the cache on repacking 

917 if self._packed_refs is None: 

918 # set both to empty because we want _peeled_refs to be 

919 # None if and only if _packed_refs is also None. 

920 self._packed_refs = {} 

921 self._peeled_refs = {} 

922 path = os.path.join(self.path, b"packed-refs") 

923 try: 

924 f = GitFile(path, "rb") 

925 except FileNotFoundError: 

926 return {} 

927 with f: 

928 first_line = next(iter(f)).rstrip() 

929 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

930 for sha, name, peeled in read_packed_refs_with_peeled(f): 

931 self._packed_refs[name] = sha 

932 if peeled: 

933 self._peeled_refs[name] = peeled 

934 else: 

935 f.seek(0) 

936 for sha, name in read_packed_refs(f): 

937 self._packed_refs[name] = sha 

938 return self._packed_refs 

939 

940 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

941 """Add the given refs as packed refs. 

942 

943 Args: 

944 new_refs: A mapping of ref names to targets; if a target is None that 

945 means remove the ref 

946 """ 

947 if not new_refs: 

948 return 

949 

950 path = os.path.join(self.path, b"packed-refs") 

951 

952 with GitFile(path, "wb") as f: 

953 # reread cached refs from disk, while holding the lock 

954 packed_refs = self.get_packed_refs().copy() 

955 

956 for ref, target in new_refs.items(): 

957 # sanity check 

958 if ref == HEADREF: 

959 raise ValueError("cannot pack HEAD") 

960 

961 # remove any loose refs pointing to this one -- please 

962 # note that this bypasses remove_if_equals as we don't 

963 # want to affect packed refs in here 

964 with suppress(OSError): 

965 os.remove(self.refpath(ref)) 

966 

967 if target is not None: 

968 packed_refs[ref] = target 

969 else: 

970 packed_refs.pop(ref, None) 

971 

972 write_packed_refs(f, packed_refs, self._peeled_refs) 

973 

974 self._packed_refs = packed_refs 

975 

976 def get_peeled(self, name: Ref) -> ObjectID | None: 

977 """Return the cached peeled value of a ref, if available. 

978 

979 Args: 

980 name: Name of the ref to peel 

981 Returns: The peeled value of the ref. If the ref is known not point to 

982 a tag, this will be the SHA the ref refers to. If the ref may point 

983 to a tag, but no cached information is available, None is returned. 

984 """ 

985 self.get_packed_refs() 

986 if ( 

987 self._peeled_refs is None 

988 or self._packed_refs is None 

989 or name not in self._packed_refs 

990 ): 

991 # No cache: no peeled refs were read, or this ref is loose 

992 return None 

993 if name in self._peeled_refs: 

994 return self._peeled_refs[name] 

995 else: 

996 # Known not peelable 

997 return self[name] 

998 

999 def read_loose_ref(self, name: Ref) -> bytes | None: 

1000 """Read a reference file and return its contents. 

1001 

1002 If the reference file a symbolic reference, only read the first line of 

1003 the file. Otherwise, only read the first 40 bytes. 

1004 

1005 Args: 

1006 name: the refname to read, relative to refpath 

1007 Returns: The contents of the ref file, or None if the file does not 

1008 exist. 

1009 

1010 Raises: 

1011 IOError: if any other error occurs 

1012 """ 

1013 filename = self.refpath(name) 

1014 try: 

1015 with GitFile(filename, "rb") as f: 

1016 header = f.read(len(SYMREF)) 

1017 if header == SYMREF: 

1018 # Read only the first line 

1019 return header + next(iter(f)).rstrip(b"\r\n") 

1020 else: 

1021 # Read only the first 40 bytes 

1022 return header + f.read(40 - len(SYMREF)) 

1023 except (OSError, UnicodeError): 

1024 # don't assume anything specific about the error; in 

1025 # particular, invalid or forbidden paths can raise weird 

1026 # errors depending on the specific operating system 

1027 return None 

1028 

1029 def _remove_packed_ref(self, name: Ref) -> None: 

1030 if self._packed_refs is None: 

1031 return 

1032 filename = os.path.join(self.path, b"packed-refs") 

1033 # reread cached refs from disk, while holding the lock 

1034 f = GitFile(filename, "wb") 

1035 try: 

1036 self._packed_refs = None 

1037 self.get_packed_refs() 

1038 

1039 if self._packed_refs is None or name not in self._packed_refs: 

1040 f.abort() 

1041 return 

1042 

1043 del self._packed_refs[name] 

1044 if self._peeled_refs is not None: 

1045 with suppress(KeyError): 

1046 del self._peeled_refs[name] 

1047 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1048 f.close() 

1049 except BaseException: 

1050 f.abort() 

1051 raise 

1052 

1053 def set_symbolic_ref( 

1054 self, 

1055 name: Ref, 

1056 other: Ref, 

1057 committer: bytes | None = None, 

1058 timestamp: int | None = None, 

1059 timezone: int | None = None, 

1060 message: bytes | None = None, 

1061 ) -> None: 

1062 """Make a ref point at another ref. 

1063 

1064 Args: 

1065 name: Name of the ref to set 

1066 other: Name of the ref to point at 

1067 committer: Optional committer name 

1068 timestamp: Optional timestamp 

1069 timezone: Optional timezone 

1070 message: Optional message to describe the change 

1071 """ 

1072 self._check_refname(name) 

1073 self._check_refname(other) 

1074 filename = self.refpath(name) 

1075 f = GitFile(filename, "wb") 

1076 try: 

1077 f.write(SYMREF + other + b"\n") 

1078 sha = self.follow(name)[-1] 

1079 self._log( 

1080 name, 

1081 sha, 

1082 sha, 

1083 committer=committer, 

1084 timestamp=timestamp, 

1085 timezone=timezone, 

1086 message=message, 

1087 ) 

1088 except BaseException: 

1089 f.abort() 

1090 raise 

1091 else: 

1092 f.close() 

1093 

1094 def set_if_equals( 

1095 self, 

1096 name: Ref, 

1097 old_ref: ObjectID | None, 

1098 new_ref: ObjectID, 

1099 committer: bytes | None = None, 

1100 timestamp: int | None = None, 

1101 timezone: int | None = None, 

1102 message: bytes | None = None, 

1103 ) -> bool: 

1104 """Set a refname to new_ref only if it currently equals old_ref. 

1105 

1106 This method follows all symbolic references, and can be used to perform 

1107 an atomic compare-and-swap operation. 

1108 

1109 Args: 

1110 name: The refname to set. 

1111 old_ref: The old sha the refname must refer to, or None to set 

1112 unconditionally. 

1113 new_ref: The new sha the refname will refer to. 

1114 committer: Optional committer name 

1115 timestamp: Optional timestamp 

1116 timezone: Optional timezone 

1117 message: Set message for reflog 

1118 Returns: True if the set was successful, False otherwise. 

1119 """ 

1120 self._check_refname(name) 

1121 try: 

1122 realnames, _ = self.follow(name) 

1123 realname = realnames[-1] 

1124 except (KeyError, IndexError, SymrefLoop): 

1125 realname = name 

1126 filename = self.refpath(realname) 

1127 

1128 # make sure none of the ancestor folders is in packed refs 

1129 probe_ref = Ref(os.path.dirname(realname)) 

1130 packed_refs = self.get_packed_refs() 

1131 while probe_ref: 

1132 if packed_refs.get(probe_ref, None) is not None: 

1133 raise NotADirectoryError(filename) 

1134 probe_ref = Ref(os.path.dirname(probe_ref)) 

1135 

1136 ensure_dir_exists(os.path.dirname(filename)) 

1137 with GitFile(filename, "wb") as f: 

1138 if old_ref is not None: 

1139 try: 

1140 # read again while holding the lock to handle race conditions 

1141 orig_ref = self.read_loose_ref(realname) 

1142 if orig_ref is None: 

1143 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1144 if orig_ref != old_ref: 

1145 f.abort() 

1146 return False 

1147 except OSError: 

1148 f.abort() 

1149 raise 

1150 

1151 # Check if ref already has the desired value while holding the lock 

1152 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1153 current_ref = self.read_loose_ref(realname) 

1154 if current_ref is None: 

1155 current_ref = packed_refs.get(realname, None) 

1156 

1157 if current_ref is not None and current_ref == new_ref: 

1158 # Ref already has desired value, abort write to avoid fsync 

1159 f.abort() 

1160 return True 

1161 

1162 try: 

1163 f.write(new_ref + b"\n") 

1164 except OSError: 

1165 f.abort() 

1166 raise 

1167 self._log( 

1168 realname, 

1169 old_ref, 

1170 new_ref, 

1171 committer=committer, 

1172 timestamp=timestamp, 

1173 timezone=timezone, 

1174 message=message, 

1175 ) 

1176 return True 

1177 

1178 def add_if_new( 

1179 self, 

1180 name: Ref, 

1181 ref: ObjectID, 

1182 committer: bytes | None = None, 

1183 timestamp: int | None = None, 

1184 timezone: int | None = None, 

1185 message: bytes | None = None, 

1186 ) -> bool: 

1187 """Add a new reference only if it does not already exist. 

1188 

1189 This method follows symrefs, and only ensures that the last ref in the 

1190 chain does not exist. 

1191 

1192 Args: 

1193 name: The refname to set. 

1194 ref: The new sha the refname will refer to. 

1195 committer: Optional committer name 

1196 timestamp: Optional timestamp 

1197 timezone: Optional timezone 

1198 message: Optional message for reflog 

1199 Returns: True if the add was successful, False otherwise. 

1200 """ 

1201 try: 

1202 realnames, contents = self.follow(name) 

1203 if contents is not None: 

1204 return False 

1205 realname = realnames[-1] 

1206 except (KeyError, IndexError): 

1207 realname = name 

1208 self._check_refname(realname) 

1209 filename = self.refpath(realname) 

1210 ensure_dir_exists(os.path.dirname(filename)) 

1211 with GitFile(filename, "wb") as f: 

1212 if os.path.exists(filename) or name in self.get_packed_refs(): 

1213 f.abort() 

1214 return False 

1215 try: 

1216 f.write(ref + b"\n") 

1217 except OSError: 

1218 f.abort() 

1219 raise 

1220 else: 

1221 self._log( 

1222 name, 

1223 None, 

1224 ref, 

1225 committer=committer, 

1226 timestamp=timestamp, 

1227 timezone=timezone, 

1228 message=message, 

1229 ) 

1230 return True 

1231 

1232 def remove_if_equals( 

1233 self, 

1234 name: Ref, 

1235 old_ref: ObjectID | None, 

1236 committer: bytes | None = None, 

1237 timestamp: int | None = None, 

1238 timezone: int | None = None, 

1239 message: bytes | None = None, 

1240 ) -> bool: 

1241 """Remove a refname only if it currently equals old_ref. 

1242 

1243 This method does not follow symbolic references. It can be used to 

1244 perform an atomic compare-and-delete operation. 

1245 

1246 Args: 

1247 name: The refname to delete. 

1248 old_ref: The old sha the refname must refer to, or None to 

1249 delete unconditionally. 

1250 committer: Optional committer name 

1251 timestamp: Optional timestamp 

1252 timezone: Optional timezone 

1253 message: Optional message 

1254 Returns: True if the delete was successful, False otherwise. 

1255 """ 

1256 self._check_refname(name) 

1257 filename = self.refpath(name) 

1258 ensure_dir_exists(os.path.dirname(filename)) 

1259 f = GitFile(filename, "wb") 

1260 try: 

1261 if old_ref is not None: 

1262 orig_ref = self.read_loose_ref(name) 

1263 if orig_ref is None: 

1264 orig_ref = self.get_packed_refs().get(name) 

1265 if orig_ref is None: 

1266 orig_ref = ZERO_SHA 

1267 if orig_ref != old_ref: 

1268 return False 

1269 

1270 # remove the reference file itself 

1271 try: 

1272 found = os.path.lexists(filename) 

1273 except OSError: 

1274 # may only be packed, or otherwise unstorable 

1275 found = False 

1276 

1277 if found: 

1278 os.remove(filename) 

1279 

1280 self._remove_packed_ref(name) 

1281 self._log( 

1282 name, 

1283 old_ref, 

1284 None, 

1285 committer=committer, 

1286 timestamp=timestamp, 

1287 timezone=timezone, 

1288 message=message, 

1289 ) 

1290 finally: 

1291 # never write, we just wanted the lock 

1292 f.abort() 

1293 

1294 # outside of the lock, clean-up any parent directory that might now 

1295 # be empty. this ensures that re-creating a reference of the same 

1296 # name of what was previously a directory works as expected 

1297 parent = name 

1298 while True: 

1299 try: 

1300 parent_bytes, _ = parent.rsplit(b"/", 1) 

1301 parent = Ref(parent_bytes) 

1302 except ValueError: 

1303 break 

1304 

1305 if parent == b"refs": 

1306 break 

1307 parent_filename = self.refpath(parent) 

1308 try: 

1309 os.rmdir(parent_filename) 

1310 except OSError: 

1311 # this can be caused by the parent directory being 

1312 # removed by another process, being not empty, etc. 

1313 # in any case, this is non fatal because we already 

1314 # removed the reference, just ignore it 

1315 break 

1316 

1317 return True 

1318 

1319 def pack_refs(self, all: bool = False) -> None: 

1320 """Pack loose refs into packed-refs file. 

1321 

1322 Args: 

1323 all: If True, pack all refs. If False, only pack tags. 

1324 """ 

1325 refs_to_pack: dict[Ref, ObjectID | None] = {} 

1326 for ref in self.allkeys(): 

1327 if ref == HEADREF: 

1328 # Never pack HEAD 

1329 continue 

1330 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1331 try: 

1332 sha = self[ref] 

1333 if sha: 

1334 refs_to_pack[ref] = sha 

1335 except KeyError: 

1336 # Broken ref, skip it 

1337 pass 

1338 

1339 if refs_to_pack: 

1340 self.add_packed_refs(refs_to_pack) 

1341 

1342 

1343def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]: 

1344 """Split a single ref line into a tuple of SHA1 and name.""" 

1345 fields = line.rstrip(b"\n\r").split(b" ") 

1346 if len(fields) != 2: 

1347 raise PackedRefsException(f"invalid ref line {line!r}") 

1348 sha, name = fields 

1349 if not valid_hexsha(sha): 

1350 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1351 if not check_ref_format(Ref(name)): 

1352 raise PackedRefsException(f"invalid ref name {name!r}") 

1353 return (ObjectID(sha), Ref(name)) 

1354 

1355 

1356def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]: 

1357 """Read a packed refs file. 

1358 

1359 Args: 

1360 f: file-like object to read from 

1361 Returns: Iterator over tuples with SHA1s and ref names. 

1362 """ 

1363 for line in f: 

1364 if line.startswith(b"#"): 

1365 # Comment 

1366 continue 

1367 if line.startswith(b"^"): 

1368 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1369 yield _split_ref_line(line) 

1370 

1371 

1372def read_packed_refs_with_peeled( 

1373 f: IO[bytes], 

1374) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]: 

1375 """Read a packed refs file including peeled refs. 

1376 

1377 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1378 with ref names, SHA1s, and peeled SHA1s (or None). 

1379 

1380 Args: 

1381 f: file-like object to read from, seek'ed to the second line 

1382 """ 

1383 last = None 

1384 for line in f: 

1385 if line.startswith(b"#"): 

1386 continue 

1387 line = line.rstrip(b"\r\n") 

1388 if line.startswith(b"^"): 

1389 if not last: 

1390 raise PackedRefsException("unexpected peeled ref line") 

1391 if not valid_hexsha(line[1:]): 

1392 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1393 sha, name = _split_ref_line(last) 

1394 last = None 

1395 yield (sha, name, ObjectID(line[1:])) 

1396 else: 

1397 if last: 

1398 sha, name = _split_ref_line(last) 

1399 yield (sha, name, None) 

1400 last = line 

1401 if last: 

1402 sha, name = _split_ref_line(last) 

1403 yield (sha, name, None) 

1404 

1405 

1406def write_packed_refs( 

1407 f: IO[bytes], 

1408 packed_refs: Mapping[Ref, ObjectID], 

1409 peeled_refs: Mapping[Ref, ObjectID] | None = None, 

1410) -> None: 

1411 """Write a packed refs file. 

1412 

1413 Args: 

1414 f: empty file-like object to write to 

1415 packed_refs: dict of refname to sha of packed refs to write 

1416 peeled_refs: dict of refname to peeled value of sha 

1417 """ 

1418 if peeled_refs is None: 

1419 peeled_refs = {} 

1420 else: 

1421 f.write(b"# pack-refs with: peeled\n") 

1422 for refname in sorted(packed_refs.keys()): 

1423 f.write(git_line(packed_refs[refname], refname)) 

1424 if refname in peeled_refs: 

1425 f.write(b"^" + peeled_refs[refname] + b"\n") 

1426 

1427 

1428def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]: 

1429 """Read info/refs file. 

1430 

1431 Args: 

1432 f: File-like object to read from 

1433 

1434 Returns: 

1435 Dictionary mapping ref names to SHA1s 

1436 """ 

1437 ret: dict[Ref, ObjectID] = {} 

1438 for line in f.readlines(): 

1439 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1440 ret[Ref(name)] = ObjectID(sha) 

1441 return ret 

1442 

1443 

1444def is_local_branch(x: bytes) -> bool: 

1445 """Check if a ref name is a local branch.""" 

1446 return x.startswith(LOCAL_BRANCH_PREFIX) 

1447 

1448 

1449def local_branch_name(name: bytes) -> Ref: 

1450 """Build a full branch ref from a short name. 

1451 

1452 Args: 

1453 name: Short branch name (e.g., b"master") or full ref 

1454 

1455 Returns: 

1456 Full branch ref name (e.g., b"refs/heads/master") 

1457 

1458 Examples: 

1459 >>> local_branch_name(b"master") 

1460 b'refs/heads/master' 

1461 >>> local_branch_name(b"refs/heads/master") 

1462 b'refs/heads/master' 

1463 """ 

1464 if name.startswith(LOCAL_BRANCH_PREFIX): 

1465 return Ref(name) 

1466 return Ref(LOCAL_BRANCH_PREFIX + name) 

1467 

1468 

1469def local_tag_name(name: bytes) -> Ref: 

1470 """Build a full tag ref from a short name. 

1471 

1472 Args: 

1473 name: Short tag name (e.g., b"v1.0") or full ref 

1474 

1475 Returns: 

1476 Full tag ref name (e.g., b"refs/tags/v1.0") 

1477 

1478 Examples: 

1479 >>> local_tag_name(b"v1.0") 

1480 b'refs/tags/v1.0' 

1481 >>> local_tag_name(b"refs/tags/v1.0") 

1482 b'refs/tags/v1.0' 

1483 """ 

1484 if name.startswith(LOCAL_TAG_PREFIX): 

1485 return Ref(name) 

1486 return Ref(LOCAL_TAG_PREFIX + name) 

1487 

1488 

1489def local_replace_name(name: bytes) -> Ref: 

1490 """Build a full replace ref from a short name. 

1491 

1492 Args: 

1493 name: Short replace name (object SHA) or full ref 

1494 

1495 Returns: 

1496 Full replace ref name (e.g., b"refs/replace/<sha>") 

1497 

1498 Examples: 

1499 >>> local_replace_name(b"abc123") 

1500 b'refs/replace/abc123' 

1501 >>> local_replace_name(b"refs/replace/abc123") 

1502 b'refs/replace/abc123' 

1503 """ 

1504 if name.startswith(LOCAL_REPLACE_PREFIX): 

1505 return Ref(name) 

1506 return Ref(LOCAL_REPLACE_PREFIX + name) 

1507 

1508 

1509def extract_branch_name(ref: bytes) -> bytes: 

1510 """Extract branch name from a full branch ref. 

1511 

1512 Args: 

1513 ref: Full branch ref (e.g., b"refs/heads/master") 

1514 

1515 Returns: 

1516 Short branch name (e.g., b"master") 

1517 

1518 Raises: 

1519 ValueError: If ref is not a local branch 

1520 

1521 Examples: 

1522 >>> extract_branch_name(b"refs/heads/master") 

1523 b'master' 

1524 >>> extract_branch_name(b"refs/heads/feature/foo") 

1525 b'feature/foo' 

1526 """ 

1527 if not ref.startswith(LOCAL_BRANCH_PREFIX): 

1528 raise ValueError(f"Not a local branch ref: {ref!r}") 

1529 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1530 

1531 

1532def extract_tag_name(ref: bytes) -> bytes: 

1533 """Extract tag name from a full tag ref. 

1534 

1535 Args: 

1536 ref: Full tag ref (e.g., b"refs/tags/v1.0") 

1537 

1538 Returns: 

1539 Short tag name (e.g., b"v1.0") 

1540 

1541 Raises: 

1542 ValueError: If ref is not a local tag 

1543 

1544 Examples: 

1545 >>> extract_tag_name(b"refs/tags/v1.0") 

1546 b'v1.0' 

1547 """ 

1548 if not ref.startswith(LOCAL_TAG_PREFIX): 

1549 raise ValueError(f"Not a local tag ref: {ref!r}") 

1550 return ref[len(LOCAL_TAG_PREFIX) :] 

1551 

1552 

1553def shorten_ref_name(ref: bytes) -> bytes: 

1554 """Convert a full ref name to its short form. 

1555 

1556 Args: 

1557 ref: Full ref name (e.g., b"refs/heads/master") 

1558 

1559 Returns: 

1560 Short ref name (e.g., b"master") 

1561 

1562 Examples: 

1563 >>> shorten_ref_name(b"refs/heads/master") 

1564 b'master' 

1565 >>> shorten_ref_name(b"refs/remotes/origin/main") 

1566 b'origin/main' 

1567 >>> shorten_ref_name(b"refs/tags/v1.0") 

1568 b'v1.0' 

1569 >>> shorten_ref_name(b"HEAD") 

1570 b'HEAD' 

1571 """ 

1572 if ref.startswith(LOCAL_BRANCH_PREFIX): 

1573 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1574 elif ref.startswith(LOCAL_REMOTE_PREFIX): 

1575 return ref[len(LOCAL_REMOTE_PREFIX) :] 

1576 elif ref.startswith(LOCAL_TAG_PREFIX): 

1577 return ref[len(LOCAL_TAG_PREFIX) :] 

1578 return ref 

1579 

1580 

1581def _set_origin_head( 

1582 refs: RefsContainer, origin: bytes, origin_head: bytes | None 

1583) -> None: 

1584 # set refs/remotes/origin/HEAD 

1585 origin_base = b"refs/remotes/" + origin + b"/" 

1586 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1587 origin_ref = Ref(origin_base + HEADREF) 

1588 target_ref = Ref(origin_base + extract_branch_name(origin_head)) 

1589 if target_ref in refs: 

1590 refs.set_symbolic_ref(origin_ref, target_ref) 

1591 

1592 

1593def _set_default_branch( 

1594 refs: RefsContainer, 

1595 origin: bytes, 

1596 origin_head: bytes | None, 

1597 branch: bytes | None, 

1598 ref_message: bytes | None, 

1599) -> bytes: 

1600 """Set the default branch.""" 

1601 origin_base = b"refs/remotes/" + origin + b"/" 

1602 if branch: 

1603 origin_ref = Ref(origin_base + branch) 

1604 if origin_ref in refs: 

1605 local_ref = Ref(local_branch_name(branch)) 

1606 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1607 head_ref = local_ref 

1608 elif Ref(local_tag_name(branch)) in refs: 

1609 head_ref = Ref(local_tag_name(branch)) 

1610 else: 

1611 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1612 elif origin_head: 

1613 head_ref = Ref(origin_head) 

1614 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1615 origin_ref = Ref(origin_base + extract_branch_name(origin_head)) 

1616 else: 

1617 origin_ref = Ref(origin_head) 

1618 try: 

1619 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1620 except KeyError: 

1621 pass 

1622 else: 

1623 raise ValueError("neither origin_head nor branch are provided") 

1624 return head_ref 

1625 

1626 

1627def _set_head( 

1628 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None 

1629) -> ObjectID | None: 

1630 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1631 # detach HEAD at specified tag 

1632 head = refs[Ref(head_ref)] 

1633 if isinstance(head, Tag): 

1634 _cls, obj = head.object 

1635 head = obj.get_object(obj).id 

1636 del refs[HEADREF] 

1637 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1638 else: 

1639 # set HEAD to specific branch 

1640 try: 

1641 head = refs[Ref(head_ref)] 

1642 refs.set_symbolic_ref(HEADREF, Ref(head_ref)) 

1643 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1644 except KeyError: 

1645 head = None 

1646 return head 

1647 

1648 

1649def _import_remote_refs( 

1650 refs_container: RefsContainer, 

1651 remote_name: str, 

1652 refs: Mapping[Ref, ObjectID | None], 

1653 message: bytes | None = None, 

1654 prune: bool = False, 

1655 prune_tags: bool = False, 

1656) -> None: 

1657 from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs 

1658 

1659 stripped_refs = strip_peeled_refs(refs) 

1660 branches: dict[Ref, ObjectID | None] = { 

1661 Ref(extract_branch_name(n)): v 

1662 for (n, v) in stripped_refs.items() 

1663 if n.startswith(LOCAL_BRANCH_PREFIX) 

1664 } 

1665 refs_container.import_refs( 

1666 Ref(b"refs/remotes/" + remote_name.encode()), 

1667 branches, 

1668 message=message, 

1669 prune=prune, 

1670 ) 

1671 tags: dict[Ref, ObjectID | None] = { 

1672 Ref(extract_tag_name(n)): v 

1673 for (n, v) in stripped_refs.items() 

1674 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1675 } 

1676 refs_container.import_refs( 

1677 Ref(LOCAL_TAG_PREFIX), tags, message=message, prune=prune_tags 

1678 ) 

1679 

1680 

1681class locked_ref: 

1682 """Lock a ref while making modifications. 

1683 

1684 Works as a context manager. 

1685 """ 

1686 

1687 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1688 """Initialize a locked ref. 

1689 

1690 Args: 

1691 refs_container: The DiskRefsContainer to lock the ref in 

1692 refname: The ref name to lock 

1693 """ 

1694 self._refs_container = refs_container 

1695 self._refname = refname 

1696 self._file: _GitFile | None = None 

1697 self._realname: Ref | None = None 

1698 self._deleted = False 

1699 

1700 def __enter__(self) -> "locked_ref": 

1701 """Enter the context manager and acquire the lock. 

1702 

1703 Returns: 

1704 This locked_ref instance 

1705 

1706 Raises: 

1707 OSError: If the lock cannot be acquired 

1708 """ 

1709 self._refs_container._check_refname(self._refname) 

1710 try: 

1711 realnames, _ = self._refs_container.follow(self._refname) 

1712 self._realname = realnames[-1] 

1713 except (KeyError, IndexError, SymrefLoop): 

1714 self._realname = self._refname 

1715 

1716 filename = self._refs_container.refpath(self._realname) 

1717 ensure_dir_exists(os.path.dirname(filename)) 

1718 f = GitFile(filename, "wb") 

1719 self._file = f 

1720 return self 

1721 

1722 def __exit__( 

1723 self, 

1724 exc_type: type | None, 

1725 exc_value: BaseException | None, 

1726 traceback: types.TracebackType | None, 

1727 ) -> None: 

1728 """Exit the context manager and release the lock. 

1729 

1730 Args: 

1731 exc_type: Type of exception if one occurred 

1732 exc_value: Exception instance if one occurred 

1733 traceback: Traceback if an exception occurred 

1734 """ 

1735 if self._file: 

1736 if exc_type is not None or self._deleted: 

1737 self._file.abort() 

1738 else: 

1739 self._file.close() 

1740 

1741 def get(self) -> bytes | None: 

1742 """Get the current value of the ref.""" 

1743 if not self._file: 

1744 raise RuntimeError("locked_ref not in context") 

1745 

1746 assert self._realname is not None 

1747 current_ref = self._refs_container.read_loose_ref(self._realname) 

1748 if current_ref is None: 

1749 current_ref = self._refs_container.get_packed_refs().get( 

1750 self._realname, None 

1751 ) 

1752 return current_ref 

1753 

1754 def ensure_equals(self, expected_value: bytes | None) -> bool: 

1755 """Ensure the ref currently equals the expected value. 

1756 

1757 Args: 

1758 expected_value: The expected current value of the ref 

1759 Returns: 

1760 True if the ref equals the expected value, False otherwise 

1761 """ 

1762 current_value = self.get() 

1763 return current_value == expected_value 

1764 

1765 def set(self, new_ref: bytes) -> None: 

1766 """Set the ref to a new value. 

1767 

1768 Args: 

1769 new_ref: The new SHA1 or symbolic ref value 

1770 """ 

1771 if not self._file: 

1772 raise RuntimeError("locked_ref not in context") 

1773 

1774 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1775 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1776 

1777 self._file.seek(0) 

1778 self._file.truncate() 

1779 self._file.write(new_ref + b"\n") 

1780 self._deleted = False 

1781 

1782 def set_symbolic_ref(self, target: Ref) -> None: 

1783 """Make this ref point at another ref. 

1784 

1785 Args: 

1786 target: Name of the ref to point at 

1787 """ 

1788 if not self._file: 

1789 raise RuntimeError("locked_ref not in context") 

1790 

1791 self._refs_container._check_refname(target) 

1792 self._file.seek(0) 

1793 self._file.truncate() 

1794 self._file.write(SYMREF + target + b"\n") 

1795 self._deleted = False 

1796 

1797 def delete(self) -> None: 

1798 """Delete the ref file while holding the lock.""" 

1799 if not self._file: 

1800 raise RuntimeError("locked_ref not in context") 

1801 

1802 # Delete the actual ref file while holding the lock 

1803 if self._realname: 

1804 filename = self._refs_container.refpath(self._realname) 

1805 try: 

1806 if os.path.lexists(filename): 

1807 os.remove(filename) 

1808 except FileNotFoundError: 

1809 pass 

1810 self._refs_container._remove_packed_ref(self._realname) 

1811 

1812 self._deleted = True 

1813 

1814 

1815class NamespacedRefsContainer(RefsContainer): 

1816 """Wrapper that adds namespace prefix to all ref operations. 

1817 

1818 This implements Git's GIT_NAMESPACE feature, which stores refs under 

1819 refs/namespaces/<namespace>/ and filters operations to only show refs 

1820 within that namespace. 

1821 

1822 Example: 

1823 With namespace "foo", a ref "refs/heads/master" is stored as 

1824 "refs/namespaces/foo/refs/heads/master" in the underlying container. 

1825 """ 

1826 

1827 def __init__(self, refs: RefsContainer, namespace: bytes) -> None: 

1828 """Initialize NamespacedRefsContainer. 

1829 

1830 Args: 

1831 refs: The underlying refs container to wrap 

1832 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar") 

1833 """ 

1834 super().__init__(logger=refs._logger) 

1835 self._refs = refs 

1836 # Build namespace prefix: refs/namespaces/<namespace>/ 

1837 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/ 

1838 namespace_parts = namespace.split(b"/") 

1839 self._namespace_prefix = b"" 

1840 for part in namespace_parts: 

1841 self._namespace_prefix += b"refs/namespaces/" + part + b"/" 

1842 

1843 def _apply_namespace(self, name: bytes) -> bytes: 

1844 """Apply namespace prefix to a ref name.""" 

1845 # HEAD and other special refs are not namespaced 

1846 if name == HEADREF or not name.startswith(b"refs/"): 

1847 return name 

1848 return self._namespace_prefix + name 

1849 

1850 def _strip_namespace(self, name: bytes) -> bytes | None: 

1851 """Remove namespace prefix from a ref name. 

1852 

1853 Returns None if the ref is not in our namespace. 

1854 """ 

1855 # HEAD and other special refs are not namespaced 

1856 if name == HEADREF or not name.startswith(b"refs/"): 

1857 return name 

1858 if name.startswith(self._namespace_prefix): 

1859 return name[len(self._namespace_prefix) :] 

1860 return None 

1861 

1862 def allkeys(self) -> set[Ref]: 

1863 """Return all reference keys in this namespace.""" 

1864 keys: set[Ref] = set() 

1865 for key in self._refs.allkeys(): 

1866 stripped = self._strip_namespace(key) 

1867 if stripped is not None: 

1868 keys.add(Ref(stripped)) 

1869 return keys 

1870 

1871 def read_loose_ref(self, name: Ref) -> bytes | None: 

1872 """Read a loose reference.""" 

1873 return self._refs.read_loose_ref(Ref(self._apply_namespace(name))) 

1874 

1875 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

1876 """Get packed refs within this namespace.""" 

1877 packed: dict[Ref, ObjectID] = {} 

1878 for name, value in self._refs.get_packed_refs().items(): 

1879 stripped = self._strip_namespace(name) 

1880 if stripped is not None: 

1881 packed[Ref(stripped)] = value 

1882 return packed 

1883 

1884 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

1885 """Add packed refs with namespace prefix.""" 

1886 namespaced_refs: dict[Ref, ObjectID | None] = { 

1887 Ref(self._apply_namespace(name)): value for name, value in new_refs.items() 

1888 } 

1889 self._refs.add_packed_refs(namespaced_refs) 

1890 

1891 def get_peeled(self, name: Ref) -> ObjectID | None: 

1892 """Return the cached peeled value of a ref.""" 

1893 return self._refs.get_peeled(Ref(self._apply_namespace(name))) 

1894 

1895 def set_symbolic_ref( 

1896 self, 

1897 name: Ref, 

1898 other: Ref, 

1899 committer: bytes | None = None, 

1900 timestamp: int | None = None, 

1901 timezone: int | None = None, 

1902 message: bytes | None = None, 

1903 ) -> None: 

1904 """Make a ref point at another ref.""" 

1905 self._refs.set_symbolic_ref( 

1906 Ref(self._apply_namespace(name)), 

1907 Ref(self._apply_namespace(other)), 

1908 committer=committer, 

1909 timestamp=timestamp, 

1910 timezone=timezone, 

1911 message=message, 

1912 ) 

1913 

1914 def set_if_equals( 

1915 self, 

1916 name: Ref, 

1917 old_ref: ObjectID | None, 

1918 new_ref: ObjectID, 

1919 committer: bytes | None = None, 

1920 timestamp: int | None = None, 

1921 timezone: int | None = None, 

1922 message: bytes | None = None, 

1923 ) -> bool: 

1924 """Set a refname to new_ref only if it currently equals old_ref.""" 

1925 return self._refs.set_if_equals( 

1926 Ref(self._apply_namespace(name)), 

1927 old_ref, 

1928 new_ref, 

1929 committer=committer, 

1930 timestamp=timestamp, 

1931 timezone=timezone, 

1932 message=message, 

1933 ) 

1934 

1935 def add_if_new( 

1936 self, 

1937 name: Ref, 

1938 ref: ObjectID, 

1939 committer: bytes | None = None, 

1940 timestamp: int | None = None, 

1941 timezone: int | None = None, 

1942 message: bytes | None = None, 

1943 ) -> bool: 

1944 """Add a new reference only if it does not already exist.""" 

1945 return self._refs.add_if_new( 

1946 Ref(self._apply_namespace(name)), 

1947 ref, 

1948 committer=committer, 

1949 timestamp=timestamp, 

1950 timezone=timezone, 

1951 message=message, 

1952 ) 

1953 

1954 def remove_if_equals( 

1955 self, 

1956 name: Ref, 

1957 old_ref: ObjectID | None, 

1958 committer: bytes | None = None, 

1959 timestamp: int | None = None, 

1960 timezone: int | None = None, 

1961 message: bytes | None = None, 

1962 ) -> bool: 

1963 """Remove a refname only if it currently equals old_ref.""" 

1964 return self._refs.remove_if_equals( 

1965 Ref(self._apply_namespace(name)), 

1966 old_ref, 

1967 committer=committer, 

1968 timestamp=timestamp, 

1969 timezone=timezone, 

1970 message=message, 

1971 ) 

1972 

1973 def pack_refs(self, all: bool = False) -> None: 

1974 """Pack loose refs into packed-refs file. 

1975 

1976 Note: This packs all refs in the underlying container, not just 

1977 those in the namespace. 

1978 """ 

1979 self._refs.pack_refs(all=all) 

1980 

1981 

1982def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

1983 """Filter refs to only include those with a given prefix. 

1984 

1985 Args: 

1986 refs: A dictionary of refs. 

1987 prefixes: The prefixes to filter by. 

1988 """ 

1989 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

1990 return filtered 

1991 

1992 

1993def is_per_worktree_ref(ref: bytes) -> bool: 

1994 """Returns whether a reference is stored per worktree or not. 

1995 

1996 Per-worktree references are: 

1997 - all pseudorefs, e.g. HEAD 

1998 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

1999 

2000 All refs starting with "refs/" are shared, except for the ones listed above. 

2001 

2002 See https://git-scm.com/docs/git-worktree#_refs. 

2003 """ 

2004 return not ref.startswith(b"refs/") or ref.startswith( 

2005 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

2006 )