Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

797 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import types 

27import warnings 

28from collections.abc import Callable, Iterable, Iterator, Mapping 

29from contextlib import suppress 

30from typing import ( 

31 IO, 

32 TYPE_CHECKING, 

33 Any, 

34 BinaryIO, 

35 TypeVar, 

36) 

37 

38if TYPE_CHECKING: 

39 from .file import _GitFile 

40 

41from .errors import PackedRefsException, RefFormatError 

42from .file import GitFile, ensure_dir_exists 

43from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

44from .pack import ObjectContainer 

45 

46Ref = bytes 

47 

48HEADREF = b"HEAD" 

49SYMREF = b"ref: " 

50LOCAL_BRANCH_PREFIX = b"refs/heads/" 

51LOCAL_TAG_PREFIX = b"refs/tags/" 

52LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

53LOCAL_NOTES_PREFIX = b"refs/notes/" 

54LOCAL_REPLACE_PREFIX = b"refs/replace/" 

55BAD_REF_CHARS = set(b"\177 ~^:?*[") 

56PEELED_TAG_SUFFIX = b"^{}" 

57 

58# For backwards compatibility 

59ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

60 

61 

62class SymrefLoop(Exception): 

63 """There is a loop between one or more symrefs.""" 

64 

65 def __init__(self, ref: bytes, depth: int) -> None: 

66 """Initialize SymrefLoop exception.""" 

67 self.ref = ref 

68 self.depth = depth 

69 

70 

71def parse_symref_value(contents: bytes) -> bytes: 

72 """Parse a symref value. 

73 

74 Args: 

75 contents: Contents to parse 

76 Returns: Destination 

77 """ 

78 if contents.startswith(SYMREF): 

79 return contents[len(SYMREF) :].rstrip(b"\r\n") 

80 raise ValueError(contents) 

81 

82 

83def check_ref_format(refname: Ref) -> bool: 

84 """Check if a refname is correctly formatted. 

85 

86 Implements all the same rules as git-check-ref-format[1]. 

87 

88 [1] 

89 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

90 

91 Args: 

92 refname: The refname to check 

93 Returns: True if refname is valid, False otherwise 

94 """ 

95 # These could be combined into one big expression, but are listed 

96 # separately to parallel [1]. 

97 if b"/." in refname or refname.startswith(b"."): 

98 return False 

99 if b"/" not in refname: 

100 return False 

101 if b".." in refname: 

102 return False 

103 for i, c in enumerate(refname): 

104 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

105 return False 

106 if refname[-1] in b"/.": 

107 return False 

108 if refname.endswith(b".lock"): 

109 return False 

110 if b"@{" in refname: 

111 return False 

112 if b"\\" in refname: 

113 return False 

114 return True 

115 

116 

117def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

118 """Parse a remote ref into remote name and branch name. 

119 

120 Args: 

121 ref: Remote ref like b"refs/remotes/origin/main" 

122 

123 Returns: 

124 Tuple of (remote_name, branch_name) 

125 

126 Raises: 

127 ValueError: If ref is not a valid remote ref 

128 """ 

129 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

130 raise ValueError(f"Not a remote ref: {ref!r}") 

131 

132 # Remove the prefix 

133 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

134 

135 # Split into remote name and branch name 

136 parts = remainder.split(b"/", 1) 

137 if len(parts) != 2: 

138 raise ValueError(f"Invalid remote ref format: {ref!r}") 

139 

140 remote_name, branch_name = parts 

141 return (remote_name, branch_name) 

142 

143 

144class RefsContainer: 

145 """A container for refs.""" 

146 

147 def __init__( 

148 self, 

149 logger: Callable[ 

150 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

151 ] 

152 | None = None, 

153 ) -> None: 

154 """Initialize RefsContainer with optional logger function.""" 

155 self._logger = logger 

156 

157 def _log( 

158 self, 

159 ref: bytes, 

160 old_sha: bytes | None, 

161 new_sha: bytes | None, 

162 committer: bytes | None = None, 

163 timestamp: int | None = None, 

164 timezone: int | None = None, 

165 message: bytes | None = None, 

166 ) -> None: 

167 if self._logger is None: 

168 return 

169 if message is None: 

170 return 

171 # Use ZERO_SHA for None values, matching git behavior 

172 if old_sha is None: 

173 old_sha = ZERO_SHA 

174 if new_sha is None: 

175 new_sha = ZERO_SHA 

176 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

177 

178 def set_symbolic_ref( 

179 self, 

180 name: bytes, 

181 other: bytes, 

182 committer: bytes | None = None, 

183 timestamp: int | None = None, 

184 timezone: int | None = None, 

185 message: bytes | None = None, 

186 ) -> None: 

187 """Make a ref point at another ref. 

188 

189 Args: 

190 name: Name of the ref to set 

191 other: Name of the ref to point at 

192 committer: Optional committer name/email 

193 timestamp: Optional timestamp 

194 timezone: Optional timezone 

195 message: Optional message 

196 """ 

197 raise NotImplementedError(self.set_symbolic_ref) 

198 

199 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

200 """Get contents of the packed-refs file. 

201 

202 Returns: Dictionary mapping ref names to SHA1s 

203 

204 Note: Will return an empty dictionary when no packed-refs file is 

205 present. 

206 """ 

207 raise NotImplementedError(self.get_packed_refs) 

208 

209 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

210 """Add the given refs as packed refs. 

211 

212 Args: 

213 new_refs: A mapping of ref names to targets; if a target is None that 

214 means remove the ref 

215 """ 

216 raise NotImplementedError(self.add_packed_refs) 

217 

218 def get_peeled(self, name: bytes) -> ObjectID | None: 

219 """Return the cached peeled value of a ref, if available. 

220 

221 Args: 

222 name: Name of the ref to peel 

223 Returns: The peeled value of the ref. If the ref is known not point to 

224 a tag, this will be the SHA the ref refers to. If the ref may point 

225 to a tag, but no cached information is available, None is returned. 

226 """ 

227 return None 

228 

229 def import_refs( 

230 self, 

231 base: Ref, 

232 other: Mapping[Ref, ObjectID], 

233 committer: bytes | None = None, 

234 timestamp: bytes | None = None, 

235 timezone: bytes | None = None, 

236 message: bytes | None = None, 

237 prune: bool = False, 

238 ) -> None: 

239 """Import refs from another repository. 

240 

241 Args: 

242 base: Base ref to import into (e.g., b'refs/remotes/origin') 

243 other: Dictionary of refs to import 

244 committer: Optional committer for reflog 

245 timestamp: Optional timestamp for reflog 

246 timezone: Optional timezone for reflog 

247 message: Optional message for reflog 

248 prune: If True, remove refs not in other 

249 """ 

250 if prune: 

251 to_delete = set(self.subkeys(base)) 

252 else: 

253 to_delete = set() 

254 for name, value in other.items(): 

255 if value is None: 

256 to_delete.add(name) 

257 else: 

258 self.set_if_equals( 

259 b"/".join((base, name)), None, value, message=message 

260 ) 

261 if to_delete: 

262 try: 

263 to_delete.remove(name) 

264 except KeyError: 

265 pass 

266 for ref in to_delete: 

267 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

268 

269 def allkeys(self) -> set[Ref]: 

270 """All refs present in this container.""" 

271 raise NotImplementedError(self.allkeys) 

272 

273 def __iter__(self) -> Iterator[Ref]: 

274 """Iterate over all reference keys.""" 

275 return iter(self.allkeys()) 

276 

277 def keys(self, base: bytes | None = None) -> set[bytes]: 

278 """Refs present in this container. 

279 

280 Args: 

281 base: An optional base to return refs under. 

282 Returns: An unsorted set of valid refs in this container, including 

283 packed refs. 

284 """ 

285 if base is not None: 

286 return self.subkeys(base) 

287 else: 

288 return self.allkeys() 

289 

290 def subkeys(self, base: bytes) -> set[bytes]: 

291 """Refs present in this container under a base. 

292 

293 Args: 

294 base: The base to return refs under. 

295 Returns: A set of valid refs in this container under the base; the base 

296 prefix is stripped from the ref names returned. 

297 """ 

298 keys = set() 

299 base_len = len(base) + 1 

300 for refname in self.allkeys(): 

301 if refname.startswith(base): 

302 keys.add(refname[base_len:]) 

303 return keys 

304 

305 def as_dict(self, base: bytes | None = None) -> dict[Ref, ObjectID]: 

306 """Return the contents of this container as a dictionary.""" 

307 ret = {} 

308 keys = self.keys(base) 

309 if base is None: 

310 base = b"" 

311 else: 

312 base = base.rstrip(b"/") 

313 for key in keys: 

314 try: 

315 ret[key] = self[(base + b"/" + key).strip(b"/")] 

316 except (SymrefLoop, KeyError): 

317 continue # Unable to resolve 

318 

319 return ret 

320 

321 def _check_refname(self, name: bytes) -> None: 

322 """Ensure a refname is valid and lives in refs or is HEAD. 

323 

324 HEAD is not a valid refname according to git-check-ref-format, but this 

325 class needs to be able to touch HEAD. Also, check_ref_format expects 

326 refnames without the leading 'refs/', but this class requires that 

327 so it cannot touch anything outside the refs dir (or HEAD). 

328 

329 Args: 

330 name: The name of the reference. 

331 

332 Raises: 

333 KeyError: if a refname is not HEAD or is otherwise not valid. 

334 """ 

335 if name in (HEADREF, b"refs/stash"): 

336 return 

337 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

338 raise RefFormatError(name) 

339 

340 def read_ref(self, refname: bytes) -> bytes | None: 

341 """Read a reference without following any references. 

342 

343 Args: 

344 refname: The name of the reference 

345 Returns: The contents of the ref file, or None if it does 

346 not exist. 

347 """ 

348 contents = self.read_loose_ref(refname) 

349 if not contents: 

350 contents = self.get_packed_refs().get(refname, None) 

351 return contents 

352 

353 def read_loose_ref(self, name: bytes) -> bytes | None: 

354 """Read a loose reference and return its contents. 

355 

356 Args: 

357 name: the refname to read 

358 Returns: The contents of the ref file, or None if it does 

359 not exist. 

360 """ 

361 raise NotImplementedError(self.read_loose_ref) 

362 

363 def follow(self, name: bytes) -> tuple[list[bytes], bytes | None]: 

364 """Follow a reference name. 

365 

366 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

367 references in the chain 

368 """ 

369 contents: bytes | None = SYMREF + name 

370 depth = 0 

371 refnames = [] 

372 while contents and contents.startswith(SYMREF): 

373 refname = contents[len(SYMREF) :] 

374 refnames.append(refname) 

375 contents = self.read_ref(refname) 

376 if not contents: 

377 break 

378 depth += 1 

379 if depth > 5: 

380 raise SymrefLoop(name, depth) 

381 return refnames, contents 

382 

383 def __contains__(self, refname: bytes) -> bool: 

384 """Check if a reference exists.""" 

385 if self.read_ref(refname): 

386 return True 

387 return False 

388 

389 def __getitem__(self, name: bytes) -> ObjectID: 

390 """Get the SHA1 for a reference name. 

391 

392 This method follows all symbolic references. 

393 """ 

394 _, sha = self.follow(name) 

395 if sha is None: 

396 raise KeyError(name) 

397 return sha 

398 

399 def set_if_equals( 

400 self, 

401 name: bytes, 

402 old_ref: bytes | None, 

403 new_ref: bytes, 

404 committer: bytes | None = None, 

405 timestamp: int | None = None, 

406 timezone: int | None = None, 

407 message: bytes | None = None, 

408 ) -> bool: 

409 """Set a refname to new_ref only if it currently equals old_ref. 

410 

411 This method follows all symbolic references if applicable for the 

412 subclass, and can be used to perform an atomic compare-and-swap 

413 operation. 

414 

415 Args: 

416 name: The refname to set. 

417 old_ref: The old sha the refname must refer to, or None to set 

418 unconditionally. 

419 new_ref: The new sha the refname will refer to. 

420 committer: Optional committer name/email 

421 timestamp: Optional timestamp 

422 timezone: Optional timezone 

423 message: Message for reflog 

424 Returns: True if the set was successful, False otherwise. 

425 """ 

426 raise NotImplementedError(self.set_if_equals) 

427 

428 def add_if_new( 

429 self, 

430 name: bytes, 

431 ref: bytes, 

432 committer: bytes | None = None, 

433 timestamp: int | None = None, 

434 timezone: int | None = None, 

435 message: bytes | None = None, 

436 ) -> bool: 

437 """Add a new reference only if it does not already exist. 

438 

439 Args: 

440 name: Ref name 

441 ref: Ref value 

442 committer: Optional committer name/email 

443 timestamp: Optional timestamp 

444 timezone: Optional timezone 

445 message: Optional message for reflog 

446 """ 

447 raise NotImplementedError(self.add_if_new) 

448 

449 def __setitem__(self, name: bytes, ref: bytes) -> None: 

450 """Set a reference name to point to the given SHA1. 

451 

452 This method follows all symbolic references if applicable for the 

453 subclass. 

454 

455 Note: This method unconditionally overwrites the contents of a 

456 reference. To update atomically only if the reference has not 

457 changed, use set_if_equals(). 

458 

459 Args: 

460 name: The refname to set. 

461 ref: The new sha the refname will refer to. 

462 """ 

463 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

464 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

465 self.set_if_equals(name, None, ref) 

466 

467 def remove_if_equals( 

468 self, 

469 name: bytes, 

470 old_ref: bytes | None, 

471 committer: bytes | None = None, 

472 timestamp: int | None = None, 

473 timezone: int | None = None, 

474 message: bytes | None = None, 

475 ) -> bool: 

476 """Remove a refname only if it currently equals old_ref. 

477 

478 This method does not follow symbolic references, even if applicable for 

479 the subclass. It can be used to perform an atomic compare-and-delete 

480 operation. 

481 

482 Args: 

483 name: The refname to delete. 

484 old_ref: The old sha the refname must refer to, or None to 

485 delete unconditionally. 

486 committer: Optional committer name/email 

487 timestamp: Optional timestamp 

488 timezone: Optional timezone 

489 message: Message for reflog 

490 Returns: True if the delete was successful, False otherwise. 

491 """ 

492 raise NotImplementedError(self.remove_if_equals) 

493 

494 def __delitem__(self, name: bytes) -> None: 

495 """Remove a refname. 

496 

497 This method does not follow symbolic references, even if applicable for 

498 the subclass. 

499 

500 Note: This method unconditionally deletes the contents of a reference. 

501 To delete atomically only if the reference has not changed, use 

502 remove_if_equals(). 

503 

504 Args: 

505 name: The refname to delete. 

506 """ 

507 self.remove_if_equals(name, None) 

508 

509 def get_symrefs(self) -> dict[bytes, bytes]: 

510 """Get a dict with all symrefs in this container. 

511 

512 Returns: Dictionary mapping source ref to target ref 

513 """ 

514 ret = {} 

515 for src in self.allkeys(): 

516 try: 

517 ref_value = self.read_ref(src) 

518 assert ref_value is not None 

519 dst = parse_symref_value(ref_value) 

520 except ValueError: 

521 pass 

522 else: 

523 ret[src] = dst 

524 return ret 

525 

526 def pack_refs(self, all: bool = False) -> None: 

527 """Pack loose refs into packed-refs file. 

528 

529 Args: 

530 all: If True, pack all refs. If False, only pack tags. 

531 """ 

532 raise NotImplementedError(self.pack_refs) 

533 

534 

535class DictRefsContainer(RefsContainer): 

536 """RefsContainer backed by a simple dict. 

537 

538 This container does not support symbolic or packed references and is not 

539 threadsafe. 

540 """ 

541 

542 def __init__( 

543 self, 

544 refs: dict[bytes, bytes], 

545 logger: Callable[ 

546 [ 

547 bytes, 

548 bytes | None, 

549 bytes | None, 

550 bytes | None, 

551 int | None, 

552 int | None, 

553 bytes | None, 

554 ], 

555 None, 

556 ] 

557 | None = None, 

558 ) -> None: 

559 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

560 super().__init__(logger=logger) 

561 self._refs = refs 

562 self._peeled: dict[bytes, ObjectID] = {} 

563 self._watchers: set[Any] = set() 

564 

565 def allkeys(self) -> set[bytes]: 

566 """Return all reference keys.""" 

567 return set(self._refs.keys()) 

568 

569 def read_loose_ref(self, name: bytes) -> bytes | None: 

570 """Read a loose reference.""" 

571 return self._refs.get(name, None) 

572 

573 def get_packed_refs(self) -> dict[bytes, bytes]: 

574 """Get packed references.""" 

575 return {} 

576 

577 def _notify(self, ref: bytes, newsha: bytes | None) -> None: 

578 for watcher in self._watchers: 

579 watcher._notify((ref, newsha)) 

580 

581 def set_symbolic_ref( 

582 self, 

583 name: Ref, 

584 other: Ref, 

585 committer: bytes | None = None, 

586 timestamp: int | None = None, 

587 timezone: int | None = None, 

588 message: bytes | None = None, 

589 ) -> None: 

590 """Make a ref point at another ref. 

591 

592 Args: 

593 name: Name of the ref to set 

594 other: Name of the ref to point at 

595 committer: Optional committer name for reflog 

596 timestamp: Optional timestamp for reflog 

597 timezone: Optional timezone for reflog 

598 message: Optional message for reflog 

599 """ 

600 old = self.follow(name)[-1] 

601 new = SYMREF + other 

602 self._refs[name] = new 

603 self._notify(name, new) 

604 self._log( 

605 name, 

606 old, 

607 new, 

608 committer=committer, 

609 timestamp=timestamp, 

610 timezone=timezone, 

611 message=message, 

612 ) 

613 

614 def set_if_equals( 

615 self, 

616 name: bytes, 

617 old_ref: bytes | None, 

618 new_ref: bytes, 

619 committer: bytes | None = None, 

620 timestamp: int | None = None, 

621 timezone: int | None = None, 

622 message: bytes | None = None, 

623 ) -> bool: 

624 """Set a refname to new_ref only if it currently equals old_ref. 

625 

626 This method follows all symbolic references, and can be used to perform 

627 an atomic compare-and-swap operation. 

628 

629 Args: 

630 name: The refname to set. 

631 old_ref: The old sha the refname must refer to, or None to set 

632 unconditionally. 

633 new_ref: The new sha the refname will refer to. 

634 committer: Optional committer name for reflog 

635 timestamp: Optional timestamp for reflog 

636 timezone: Optional timezone for reflog 

637 message: Optional message for reflog 

638 

639 Returns: 

640 True if the set was successful, False otherwise. 

641 """ 

642 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

643 return False 

644 # Only update the specific ref requested, not the whole chain 

645 self._check_refname(name) 

646 old = self._refs.get(name) 

647 self._refs[name] = new_ref 

648 self._notify(name, new_ref) 

649 self._log( 

650 name, 

651 old, 

652 new_ref, 

653 committer=committer, 

654 timestamp=timestamp, 

655 timezone=timezone, 

656 message=message, 

657 ) 

658 return True 

659 

660 def add_if_new( 

661 self, 

662 name: Ref, 

663 ref: ObjectID, 

664 committer: bytes | None = None, 

665 timestamp: int | None = None, 

666 timezone: int | None = None, 

667 message: bytes | None = None, 

668 ) -> bool: 

669 """Add a new reference only if it does not already exist. 

670 

671 Args: 

672 name: Ref name 

673 ref: Ref value 

674 committer: Optional committer name for reflog 

675 timestamp: Optional timestamp for reflog 

676 timezone: Optional timezone for reflog 

677 message: Optional message for reflog 

678 

679 Returns: 

680 True if the add was successful, False otherwise. 

681 """ 

682 if name in self._refs: 

683 return False 

684 self._refs[name] = ref 

685 self._notify(name, ref) 

686 self._log( 

687 name, 

688 None, 

689 ref, 

690 committer=committer, 

691 timestamp=timestamp, 

692 timezone=timezone, 

693 message=message, 

694 ) 

695 return True 

696 

697 def remove_if_equals( 

698 self, 

699 name: bytes, 

700 old_ref: bytes | None, 

701 committer: bytes | None = None, 

702 timestamp: int | None = None, 

703 timezone: int | None = None, 

704 message: bytes | None = None, 

705 ) -> bool: 

706 """Remove a refname only if it currently equals old_ref. 

707 

708 This method does not follow symbolic references. It can be used to 

709 perform an atomic compare-and-delete operation. 

710 

711 Args: 

712 name: The refname to delete. 

713 old_ref: The old sha the refname must refer to, or None to 

714 delete unconditionally. 

715 committer: Optional committer name for reflog 

716 timestamp: Optional timestamp for reflog 

717 timezone: Optional timezone for reflog 

718 message: Optional message for reflog 

719 

720 Returns: 

721 True if the delete was successful, False otherwise. 

722 """ 

723 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

724 return False 

725 try: 

726 old = self._refs.pop(name) 

727 except KeyError: 

728 pass 

729 else: 

730 self._notify(name, None) 

731 self._log( 

732 name, 

733 old, 

734 None, 

735 committer=committer, 

736 timestamp=timestamp, 

737 timezone=timezone, 

738 message=message, 

739 ) 

740 return True 

741 

742 def get_peeled(self, name: bytes) -> bytes | None: 

743 """Get peeled version of a reference.""" 

744 return self._peeled.get(name) 

745 

746 def _update(self, refs: Mapping[bytes, bytes]) -> None: 

747 """Update multiple refs; intended only for testing.""" 

748 # TODO(dborowitz): replace this with a public function that uses 

749 # set_if_equal. 

750 for ref, sha in refs.items(): 

751 self.set_if_equals(ref, None, sha) 

752 

753 def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None: 

754 """Update cached peeled refs; intended only for testing.""" 

755 self._peeled.update(peeled) 

756 

757 

758class InfoRefsContainer(RefsContainer): 

759 """Refs container that reads refs from a info/refs file.""" 

760 

761 def __init__(self, f: BinaryIO) -> None: 

762 """Initialize InfoRefsContainer from info/refs file.""" 

763 self._refs: dict[bytes, bytes] = {} 

764 self._peeled: dict[bytes, bytes] = {} 

765 refs = read_info_refs(f) 

766 (self._refs, self._peeled) = split_peeled_refs(refs) 

767 

768 def allkeys(self) -> set[bytes]: 

769 """Return all reference keys.""" 

770 return set(self._refs.keys()) 

771 

772 def read_loose_ref(self, name: bytes) -> bytes | None: 

773 """Read a loose reference.""" 

774 return self._refs.get(name, None) 

775 

776 def get_packed_refs(self) -> dict[bytes, bytes]: 

777 """Get packed references.""" 

778 return {} 

779 

780 def get_peeled(self, name: bytes) -> bytes | None: 

781 """Get peeled version of a reference.""" 

782 try: 

783 return self._peeled[name] 

784 except KeyError: 

785 return self._refs[name] 

786 

787 

788class DiskRefsContainer(RefsContainer): 

789 """Refs container that reads refs from disk.""" 

790 

791 def __init__( 

792 self, 

793 path: str | bytes | os.PathLike[str], 

794 worktree_path: str | bytes | os.PathLike[str] | None = None, 

795 logger: Callable[ 

796 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

797 ] 

798 | None = None, 

799 ) -> None: 

800 """Initialize DiskRefsContainer.""" 

801 super().__init__(logger=logger) 

802 # Convert path-like objects to strings, then to bytes for Git compatibility 

803 self.path = os.fsencode(os.fspath(path)) 

804 if worktree_path is None: 

805 self.worktree_path = self.path 

806 else: 

807 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

808 self._packed_refs: dict[bytes, bytes] | None = None 

809 self._peeled_refs: dict[bytes, bytes] | None = None 

810 

811 def __repr__(self) -> str: 

812 """Return string representation of DiskRefsContainer.""" 

813 return f"{self.__class__.__name__}({self.path!r})" 

814 

815 def _iter_dir( 

816 self, 

817 path: bytes, 

818 base: bytes, 

819 dir_filter: Callable[[bytes], bool] | None = None, 

820 ) -> Iterator[bytes]: 

821 refspath = os.path.join(path, base.rstrip(b"/")) 

822 prefix_len = len(os.path.join(path, b"")) 

823 

824 for root, dirs, files in os.walk(refspath): 

825 directory = root[prefix_len:] 

826 if os.path.sep != "/": 

827 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

828 if dir_filter is not None: 

829 dirs[:] = [ 

830 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

831 ] 

832 

833 for filename in files: 

834 refname = b"/".join([directory, filename]) 

835 if check_ref_format(refname): 

836 yield refname 

837 

838 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]: 

839 base = base.rstrip(b"/") + b"/" 

840 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = [] 

841 if base != b"refs/": 

842 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

843 search_paths.append((path, None)) 

844 elif self.worktree_path == self.path: 

845 # Iterate through all the refs from the main worktree 

846 search_paths.append((self.path, None)) 

847 else: 

848 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

849 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

850 # Iterate through all the per-worktree refs from the worktree's gitdir 

851 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

852 

853 for path, dir_filter in search_paths: 

854 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

855 

856 def subkeys(self, base: bytes) -> set[bytes]: 

857 """Return subkeys under a given base reference path.""" 

858 subkeys = set() 

859 

860 for key in self._iter_loose_refs(base): 

861 if key.startswith(base): 

862 subkeys.add(key[len(base) :].strip(b"/")) 

863 

864 for key in self.get_packed_refs(): 

865 if key.startswith(base): 

866 subkeys.add(key[len(base) :].strip(b"/")) 

867 return subkeys 

868 

869 def allkeys(self) -> set[bytes]: 

870 """Return all reference keys.""" 

871 allkeys = set() 

872 if os.path.exists(self.refpath(HEADREF)): 

873 allkeys.add(HEADREF) 

874 

875 allkeys.update(self._iter_loose_refs()) 

876 allkeys.update(self.get_packed_refs()) 

877 return allkeys 

878 

879 def refpath(self, name: bytes) -> bytes: 

880 """Return the disk path of a ref.""" 

881 path = name 

882 if os.path.sep != "/": 

883 path = path.replace(b"/", os.fsencode(os.path.sep)) 

884 

885 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

886 return os.path.join(root_dir, path) 

887 

888 def get_packed_refs(self) -> dict[bytes, bytes]: 

889 """Get contents of the packed-refs file. 

890 

891 Returns: Dictionary mapping ref names to SHA1s 

892 

893 Note: Will return an empty dictionary when no packed-refs file is 

894 present. 

895 """ 

896 # TODO: invalidate the cache on repacking 

897 if self._packed_refs is None: 

898 # set both to empty because we want _peeled_refs to be 

899 # None if and only if _packed_refs is also None. 

900 self._packed_refs = {} 

901 self._peeled_refs = {} 

902 path = os.path.join(self.path, b"packed-refs") 

903 try: 

904 f = GitFile(path, "rb") 

905 except FileNotFoundError: 

906 return {} 

907 with f: 

908 first_line = next(iter(f)).rstrip() 

909 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

910 for sha, name, peeled in read_packed_refs_with_peeled(f): 

911 self._packed_refs[name] = sha 

912 if peeled: 

913 self._peeled_refs[name] = peeled 

914 else: 

915 f.seek(0) 

916 for sha, name in read_packed_refs(f): 

917 self._packed_refs[name] = sha 

918 return self._packed_refs 

919 

920 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

921 """Add the given refs as packed refs. 

922 

923 Args: 

924 new_refs: A mapping of ref names to targets; if a target is None that 

925 means remove the ref 

926 """ 

927 if not new_refs: 

928 return 

929 

930 path = os.path.join(self.path, b"packed-refs") 

931 

932 with GitFile(path, "wb") as f: 

933 # reread cached refs from disk, while holding the lock 

934 packed_refs = self.get_packed_refs().copy() 

935 

936 for ref, target in new_refs.items(): 

937 # sanity check 

938 if ref == HEADREF: 

939 raise ValueError("cannot pack HEAD") 

940 

941 # remove any loose refs pointing to this one -- please 

942 # note that this bypasses remove_if_equals as we don't 

943 # want to affect packed refs in here 

944 with suppress(OSError): 

945 os.remove(self.refpath(ref)) 

946 

947 if target is not None: 

948 packed_refs[ref] = target 

949 else: 

950 packed_refs.pop(ref, None) 

951 

952 write_packed_refs(f, packed_refs, self._peeled_refs) 

953 

954 self._packed_refs = packed_refs 

955 

956 def get_peeled(self, name: bytes) -> bytes | None: 

957 """Return the cached peeled value of a ref, if available. 

958 

959 Args: 

960 name: Name of the ref to peel 

961 Returns: The peeled value of the ref. If the ref is known not point to 

962 a tag, this will be the SHA the ref refers to. If the ref may point 

963 to a tag, but no cached information is available, None is returned. 

964 """ 

965 self.get_packed_refs() 

966 if ( 

967 self._peeled_refs is None 

968 or self._packed_refs is None 

969 or name not in self._packed_refs 

970 ): 

971 # No cache: no peeled refs were read, or this ref is loose 

972 return None 

973 if name in self._peeled_refs: 

974 return self._peeled_refs[name] 

975 else: 

976 # Known not peelable 

977 return self[name] 

978 

979 def read_loose_ref(self, name: bytes) -> bytes | None: 

980 """Read a reference file and return its contents. 

981 

982 If the reference file a symbolic reference, only read the first line of 

983 the file. Otherwise, only read the first 40 bytes. 

984 

985 Args: 

986 name: the refname to read, relative to refpath 

987 Returns: The contents of the ref file, or None if the file does not 

988 exist. 

989 

990 Raises: 

991 IOError: if any other error occurs 

992 """ 

993 filename = self.refpath(name) 

994 try: 

995 with GitFile(filename, "rb") as f: 

996 header = f.read(len(SYMREF)) 

997 if header == SYMREF: 

998 # Read only the first line 

999 return header + next(iter(f)).rstrip(b"\r\n") 

1000 else: 

1001 # Read only the first 40 bytes 

1002 return header + f.read(40 - len(SYMREF)) 

1003 except (OSError, UnicodeError): 

1004 # don't assume anything specific about the error; in 

1005 # particular, invalid or forbidden paths can raise weird 

1006 # errors depending on the specific operating system 

1007 return None 

1008 

1009 def _remove_packed_ref(self, name: bytes) -> None: 

1010 if self._packed_refs is None: 

1011 return 

1012 filename = os.path.join(self.path, b"packed-refs") 

1013 # reread cached refs from disk, while holding the lock 

1014 f = GitFile(filename, "wb") 

1015 try: 

1016 self._packed_refs = None 

1017 self.get_packed_refs() 

1018 

1019 if self._packed_refs is None or name not in self._packed_refs: 

1020 f.abort() 

1021 return 

1022 

1023 del self._packed_refs[name] 

1024 if self._peeled_refs is not None: 

1025 with suppress(KeyError): 

1026 del self._peeled_refs[name] 

1027 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1028 f.close() 

1029 except BaseException: 

1030 f.abort() 

1031 raise 

1032 

1033 def set_symbolic_ref( 

1034 self, 

1035 name: bytes, 

1036 other: bytes, 

1037 committer: bytes | None = None, 

1038 timestamp: int | None = None, 

1039 timezone: int | None = None, 

1040 message: bytes | None = None, 

1041 ) -> None: 

1042 """Make a ref point at another ref. 

1043 

1044 Args: 

1045 name: Name of the ref to set 

1046 other: Name of the ref to point at 

1047 committer: Optional committer name 

1048 timestamp: Optional timestamp 

1049 timezone: Optional timezone 

1050 message: Optional message to describe the change 

1051 """ 

1052 self._check_refname(name) 

1053 self._check_refname(other) 

1054 filename = self.refpath(name) 

1055 f = GitFile(filename, "wb") 

1056 try: 

1057 f.write(SYMREF + other + b"\n") 

1058 sha = self.follow(name)[-1] 

1059 self._log( 

1060 name, 

1061 sha, 

1062 sha, 

1063 committer=committer, 

1064 timestamp=timestamp, 

1065 timezone=timezone, 

1066 message=message, 

1067 ) 

1068 except BaseException: 

1069 f.abort() 

1070 raise 

1071 else: 

1072 f.close() 

1073 

1074 def set_if_equals( 

1075 self, 

1076 name: bytes, 

1077 old_ref: bytes | None, 

1078 new_ref: bytes, 

1079 committer: bytes | None = None, 

1080 timestamp: int | None = None, 

1081 timezone: int | None = None, 

1082 message: bytes | None = None, 

1083 ) -> bool: 

1084 """Set a refname to new_ref only if it currently equals old_ref. 

1085 

1086 This method follows all symbolic references, and can be used to perform 

1087 an atomic compare-and-swap operation. 

1088 

1089 Args: 

1090 name: The refname to set. 

1091 old_ref: The old sha the refname must refer to, or None to set 

1092 unconditionally. 

1093 new_ref: The new sha the refname will refer to. 

1094 committer: Optional committer name 

1095 timestamp: Optional timestamp 

1096 timezone: Optional timezone 

1097 message: Set message for reflog 

1098 Returns: True if the set was successful, False otherwise. 

1099 """ 

1100 self._check_refname(name) 

1101 try: 

1102 realnames, _ = self.follow(name) 

1103 realname = realnames[-1] 

1104 except (KeyError, IndexError, SymrefLoop): 

1105 realname = name 

1106 filename = self.refpath(realname) 

1107 

1108 # make sure none of the ancestor folders is in packed refs 

1109 probe_ref = os.path.dirname(realname) 

1110 packed_refs = self.get_packed_refs() 

1111 while probe_ref: 

1112 if packed_refs.get(probe_ref, None) is not None: 

1113 raise NotADirectoryError(filename) 

1114 probe_ref = os.path.dirname(probe_ref) 

1115 

1116 ensure_dir_exists(os.path.dirname(filename)) 

1117 with GitFile(filename, "wb") as f: 

1118 if old_ref is not None: 

1119 try: 

1120 # read again while holding the lock to handle race conditions 

1121 orig_ref = self.read_loose_ref(realname) 

1122 if orig_ref is None: 

1123 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1124 if orig_ref != old_ref: 

1125 f.abort() 

1126 return False 

1127 except OSError: 

1128 f.abort() 

1129 raise 

1130 

1131 # Check if ref already has the desired value while holding the lock 

1132 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1133 current_ref = self.read_loose_ref(realname) 

1134 if current_ref is None: 

1135 current_ref = packed_refs.get(realname, None) 

1136 

1137 if current_ref is not None and current_ref == new_ref: 

1138 # Ref already has desired value, abort write to avoid fsync 

1139 f.abort() 

1140 return True 

1141 

1142 try: 

1143 f.write(new_ref + b"\n") 

1144 except OSError: 

1145 f.abort() 

1146 raise 

1147 self._log( 

1148 realname, 

1149 old_ref, 

1150 new_ref, 

1151 committer=committer, 

1152 timestamp=timestamp, 

1153 timezone=timezone, 

1154 message=message, 

1155 ) 

1156 return True 

1157 

1158 def add_if_new( 

1159 self, 

1160 name: bytes, 

1161 ref: bytes, 

1162 committer: bytes | None = None, 

1163 timestamp: int | None = None, 

1164 timezone: int | None = None, 

1165 message: bytes | None = None, 

1166 ) -> bool: 

1167 """Add a new reference only if it does not already exist. 

1168 

1169 This method follows symrefs, and only ensures that the last ref in the 

1170 chain does not exist. 

1171 

1172 Args: 

1173 name: The refname to set. 

1174 ref: The new sha the refname will refer to. 

1175 committer: Optional committer name 

1176 timestamp: Optional timestamp 

1177 timezone: Optional timezone 

1178 message: Optional message for reflog 

1179 Returns: True if the add was successful, False otherwise. 

1180 """ 

1181 try: 

1182 realnames, contents = self.follow(name) 

1183 if contents is not None: 

1184 return False 

1185 realname = realnames[-1] 

1186 except (KeyError, IndexError): 

1187 realname = name 

1188 self._check_refname(realname) 

1189 filename = self.refpath(realname) 

1190 ensure_dir_exists(os.path.dirname(filename)) 

1191 with GitFile(filename, "wb") as f: 

1192 if os.path.exists(filename) or name in self.get_packed_refs(): 

1193 f.abort() 

1194 return False 

1195 try: 

1196 f.write(ref + b"\n") 

1197 except OSError: 

1198 f.abort() 

1199 raise 

1200 else: 

1201 self._log( 

1202 name, 

1203 None, 

1204 ref, 

1205 committer=committer, 

1206 timestamp=timestamp, 

1207 timezone=timezone, 

1208 message=message, 

1209 ) 

1210 return True 

1211 

1212 def remove_if_equals( 

1213 self, 

1214 name: bytes, 

1215 old_ref: bytes | None, 

1216 committer: bytes | None = None, 

1217 timestamp: int | None = None, 

1218 timezone: int | None = None, 

1219 message: bytes | None = None, 

1220 ) -> bool: 

1221 """Remove a refname only if it currently equals old_ref. 

1222 

1223 This method does not follow symbolic references. It can be used to 

1224 perform an atomic compare-and-delete operation. 

1225 

1226 Args: 

1227 name: The refname to delete. 

1228 old_ref: The old sha the refname must refer to, or None to 

1229 delete unconditionally. 

1230 committer: Optional committer name 

1231 timestamp: Optional timestamp 

1232 timezone: Optional timezone 

1233 message: Optional message 

1234 Returns: True if the delete was successful, False otherwise. 

1235 """ 

1236 self._check_refname(name) 

1237 filename = self.refpath(name) 

1238 ensure_dir_exists(os.path.dirname(filename)) 

1239 f = GitFile(filename, "wb") 

1240 try: 

1241 if old_ref is not None: 

1242 orig_ref = self.read_loose_ref(name) 

1243 if orig_ref is None: 

1244 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1245 if orig_ref != old_ref: 

1246 return False 

1247 

1248 # remove the reference file itself 

1249 try: 

1250 found = os.path.lexists(filename) 

1251 except OSError: 

1252 # may only be packed, or otherwise unstorable 

1253 found = False 

1254 

1255 if found: 

1256 os.remove(filename) 

1257 

1258 self._remove_packed_ref(name) 

1259 self._log( 

1260 name, 

1261 old_ref, 

1262 None, 

1263 committer=committer, 

1264 timestamp=timestamp, 

1265 timezone=timezone, 

1266 message=message, 

1267 ) 

1268 finally: 

1269 # never write, we just wanted the lock 

1270 f.abort() 

1271 

1272 # outside of the lock, clean-up any parent directory that might now 

1273 # be empty. this ensures that re-creating a reference of the same 

1274 # name of what was previously a directory works as expected 

1275 parent = name 

1276 while True: 

1277 try: 

1278 parent, _ = parent.rsplit(b"/", 1) 

1279 except ValueError: 

1280 break 

1281 

1282 if parent == b"refs": 

1283 break 

1284 parent_filename = self.refpath(parent) 

1285 try: 

1286 os.rmdir(parent_filename) 

1287 except OSError: 

1288 # this can be caused by the parent directory being 

1289 # removed by another process, being not empty, etc. 

1290 # in any case, this is non fatal because we already 

1291 # removed the reference, just ignore it 

1292 break 

1293 

1294 return True 

1295 

1296 def pack_refs(self, all: bool = False) -> None: 

1297 """Pack loose refs into packed-refs file. 

1298 

1299 Args: 

1300 all: If True, pack all refs. If False, only pack tags. 

1301 """ 

1302 refs_to_pack: dict[Ref, ObjectID | None] = {} 

1303 for ref in self.allkeys(): 

1304 if ref == HEADREF: 

1305 # Never pack HEAD 

1306 continue 

1307 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1308 try: 

1309 sha = self[ref] 

1310 if sha: 

1311 refs_to_pack[ref] = sha 

1312 except KeyError: 

1313 # Broken ref, skip it 

1314 pass 

1315 

1316 if refs_to_pack: 

1317 self.add_packed_refs(refs_to_pack) 

1318 

1319 

1320def _split_ref_line(line: bytes) -> tuple[bytes, bytes]: 

1321 """Split a single ref line into a tuple of SHA1 and name.""" 

1322 fields = line.rstrip(b"\n\r").split(b" ") 

1323 if len(fields) != 2: 

1324 raise PackedRefsException(f"invalid ref line {line!r}") 

1325 sha, name = fields 

1326 if not valid_hexsha(sha): 

1327 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1328 if not check_ref_format(name): 

1329 raise PackedRefsException(f"invalid ref name {name!r}") 

1330 return (sha, name) 

1331 

1332 

1333def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]: 

1334 """Read a packed refs file. 

1335 

1336 Args: 

1337 f: file-like object to read from 

1338 Returns: Iterator over tuples with SHA1s and ref names. 

1339 """ 

1340 for line in f: 

1341 if line.startswith(b"#"): 

1342 # Comment 

1343 continue 

1344 if line.startswith(b"^"): 

1345 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1346 yield _split_ref_line(line) 

1347 

1348 

1349def read_packed_refs_with_peeled( 

1350 f: IO[bytes], 

1351) -> Iterator[tuple[bytes, bytes, bytes | None]]: 

1352 """Read a packed refs file including peeled refs. 

1353 

1354 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1355 with ref names, SHA1s, and peeled SHA1s (or None). 

1356 

1357 Args: 

1358 f: file-like object to read from, seek'ed to the second line 

1359 """ 

1360 last = None 

1361 for line in f: 

1362 if line.startswith(b"#"): 

1363 continue 

1364 line = line.rstrip(b"\r\n") 

1365 if line.startswith(b"^"): 

1366 if not last: 

1367 raise PackedRefsException("unexpected peeled ref line") 

1368 if not valid_hexsha(line[1:]): 

1369 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1370 sha, name = _split_ref_line(last) 

1371 last = None 

1372 yield (sha, name, line[1:]) 

1373 else: 

1374 if last: 

1375 sha, name = _split_ref_line(last) 

1376 yield (sha, name, None) 

1377 last = line 

1378 if last: 

1379 sha, name = _split_ref_line(last) 

1380 yield (sha, name, None) 

1381 

1382 

1383def write_packed_refs( 

1384 f: IO[bytes], 

1385 packed_refs: Mapping[bytes, bytes], 

1386 peeled_refs: Mapping[bytes, bytes] | None = None, 

1387) -> None: 

1388 """Write a packed refs file. 

1389 

1390 Args: 

1391 f: empty file-like object to write to 

1392 packed_refs: dict of refname to sha of packed refs to write 

1393 peeled_refs: dict of refname to peeled value of sha 

1394 """ 

1395 if peeled_refs is None: 

1396 peeled_refs = {} 

1397 else: 

1398 f.write(b"# pack-refs with: peeled\n") 

1399 for refname in sorted(packed_refs.keys()): 

1400 f.write(git_line(packed_refs[refname], refname)) 

1401 if refname in peeled_refs: 

1402 f.write(b"^" + peeled_refs[refname] + b"\n") 

1403 

1404 

1405def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]: 

1406 """Read info/refs file. 

1407 

1408 Args: 

1409 f: File-like object to read from 

1410 

1411 Returns: 

1412 Dictionary mapping ref names to SHA1s 

1413 """ 

1414 ret = {} 

1415 for line in f.readlines(): 

1416 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1417 ret[name] = sha 

1418 return ret 

1419 

1420 

1421def write_info_refs( 

1422 refs: Mapping[bytes, bytes], store: ObjectContainer 

1423) -> Iterator[bytes]: 

1424 """Generate info refs.""" 

1425 # TODO: Avoid recursive import :( 

1426 from .object_store import peel_sha 

1427 

1428 for name, sha in sorted(refs.items()): 

1429 # get_refs() includes HEAD as a special case, but we don't want to 

1430 # advertise it 

1431 if name == HEADREF: 

1432 continue 

1433 try: 

1434 o = store[sha] 

1435 except KeyError: 

1436 continue 

1437 _unpeeled, peeled = peel_sha(store, sha) 

1438 yield o.id + b"\t" + name + b"\n" 

1439 if o.id != peeled.id: 

1440 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1441 

1442 

1443def is_local_branch(x: bytes) -> bool: 

1444 """Check if a ref name is a local branch.""" 

1445 return x.startswith(LOCAL_BRANCH_PREFIX) 

1446 

1447 

1448def local_branch_name(name: bytes) -> bytes: 

1449 """Build a full branch ref from a short name. 

1450 

1451 Args: 

1452 name: Short branch name (e.g., b"master") or full ref 

1453 

1454 Returns: 

1455 Full branch ref name (e.g., b"refs/heads/master") 

1456 

1457 Examples: 

1458 >>> local_branch_name(b"master") 

1459 b'refs/heads/master' 

1460 >>> local_branch_name(b"refs/heads/master") 

1461 b'refs/heads/master' 

1462 """ 

1463 if name.startswith(LOCAL_BRANCH_PREFIX): 

1464 return name 

1465 return LOCAL_BRANCH_PREFIX + name 

1466 

1467 

1468def local_tag_name(name: bytes) -> bytes: 

1469 """Build a full tag ref from a short name. 

1470 

1471 Args: 

1472 name: Short tag name (e.g., b"v1.0") or full ref 

1473 

1474 Returns: 

1475 Full tag ref name (e.g., b"refs/tags/v1.0") 

1476 

1477 Examples: 

1478 >>> local_tag_name(b"v1.0") 

1479 b'refs/tags/v1.0' 

1480 >>> local_tag_name(b"refs/tags/v1.0") 

1481 b'refs/tags/v1.0' 

1482 """ 

1483 if name.startswith(LOCAL_TAG_PREFIX): 

1484 return name 

1485 return LOCAL_TAG_PREFIX + name 

1486 

1487 

1488def local_replace_name(name: bytes) -> bytes: 

1489 """Build a full replace ref from a short name. 

1490 

1491 Args: 

1492 name: Short replace name (object SHA) or full ref 

1493 

1494 Returns: 

1495 Full replace ref name (e.g., b"refs/replace/<sha>") 

1496 

1497 Examples: 

1498 >>> local_replace_name(b"abc123") 

1499 b'refs/replace/abc123' 

1500 >>> local_replace_name(b"refs/replace/abc123") 

1501 b'refs/replace/abc123' 

1502 """ 

1503 if name.startswith(LOCAL_REPLACE_PREFIX): 

1504 return name 

1505 return LOCAL_REPLACE_PREFIX + name 

1506 

1507 

1508def extract_branch_name(ref: bytes) -> bytes: 

1509 """Extract branch name from a full branch ref. 

1510 

1511 Args: 

1512 ref: Full branch ref (e.g., b"refs/heads/master") 

1513 

1514 Returns: 

1515 Short branch name (e.g., b"master") 

1516 

1517 Raises: 

1518 ValueError: If ref is not a local branch 

1519 

1520 Examples: 

1521 >>> extract_branch_name(b"refs/heads/master") 

1522 b'master' 

1523 >>> extract_branch_name(b"refs/heads/feature/foo") 

1524 b'feature/foo' 

1525 """ 

1526 if not ref.startswith(LOCAL_BRANCH_PREFIX): 

1527 raise ValueError(f"Not a local branch ref: {ref!r}") 

1528 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1529 

1530 

1531def extract_tag_name(ref: bytes) -> bytes: 

1532 """Extract tag name from a full tag ref. 

1533 

1534 Args: 

1535 ref: Full tag ref (e.g., b"refs/tags/v1.0") 

1536 

1537 Returns: 

1538 Short tag name (e.g., b"v1.0") 

1539 

1540 Raises: 

1541 ValueError: If ref is not a local tag 

1542 

1543 Examples: 

1544 >>> extract_tag_name(b"refs/tags/v1.0") 

1545 b'v1.0' 

1546 """ 

1547 if not ref.startswith(LOCAL_TAG_PREFIX): 

1548 raise ValueError(f"Not a local tag ref: {ref!r}") 

1549 return ref[len(LOCAL_TAG_PREFIX) :] 

1550 

1551 

1552def shorten_ref_name(ref: bytes) -> bytes: 

1553 """Convert a full ref name to its short form. 

1554 

1555 Args: 

1556 ref: Full ref name (e.g., b"refs/heads/master") 

1557 

1558 Returns: 

1559 Short ref name (e.g., b"master") 

1560 

1561 Examples: 

1562 >>> shorten_ref_name(b"refs/heads/master") 

1563 b'master' 

1564 >>> shorten_ref_name(b"refs/remotes/origin/main") 

1565 b'origin/main' 

1566 >>> shorten_ref_name(b"refs/tags/v1.0") 

1567 b'v1.0' 

1568 >>> shorten_ref_name(b"HEAD") 

1569 b'HEAD' 

1570 """ 

1571 if ref.startswith(LOCAL_BRANCH_PREFIX): 

1572 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1573 elif ref.startswith(LOCAL_REMOTE_PREFIX): 

1574 return ref[len(LOCAL_REMOTE_PREFIX) :] 

1575 elif ref.startswith(LOCAL_TAG_PREFIX): 

1576 return ref[len(LOCAL_TAG_PREFIX) :] 

1577 return ref 

1578 

1579 

1580T = TypeVar("T", dict[bytes, bytes], dict[bytes, bytes | None]) 

1581 

1582 

1583def strip_peeled_refs(refs: T) -> T: 

1584 """Remove all peeled refs.""" 

1585 return { 

1586 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1587 } 

1588 

1589 

1590def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]: 

1591 """Split peeled refs from regular refs.""" 

1592 peeled: dict[bytes, bytes] = {} 

1593 regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)} 

1594 

1595 for ref, sha in refs.items(): 

1596 if ref.endswith(PEELED_TAG_SUFFIX): 

1597 # Only add to peeled dict if sha is not None 

1598 if sha is not None: 

1599 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1600 

1601 return regular, peeled 

1602 

1603 

1604def _set_origin_head( 

1605 refs: RefsContainer, origin: bytes, origin_head: bytes | None 

1606) -> None: 

1607 # set refs/remotes/origin/HEAD 

1608 origin_base = b"refs/remotes/" + origin + b"/" 

1609 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1610 origin_ref = origin_base + HEADREF 

1611 target_ref = origin_base + extract_branch_name(origin_head) 

1612 if target_ref in refs: 

1613 refs.set_symbolic_ref(origin_ref, target_ref) 

1614 

1615 

1616def _set_default_branch( 

1617 refs: RefsContainer, 

1618 origin: bytes, 

1619 origin_head: bytes | None, 

1620 branch: bytes | None, 

1621 ref_message: bytes | None, 

1622) -> bytes: 

1623 """Set the default branch.""" 

1624 origin_base = b"refs/remotes/" + origin + b"/" 

1625 if branch: 

1626 origin_ref = origin_base + branch 

1627 if origin_ref in refs: 

1628 local_ref = local_branch_name(branch) 

1629 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1630 head_ref = local_ref 

1631 elif local_tag_name(branch) in refs: 

1632 head_ref = local_tag_name(branch) 

1633 else: 

1634 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1635 elif origin_head: 

1636 head_ref = origin_head 

1637 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1638 origin_ref = origin_base + extract_branch_name(origin_head) 

1639 else: 

1640 origin_ref = origin_head 

1641 try: 

1642 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1643 except KeyError: 

1644 pass 

1645 else: 

1646 raise ValueError("neither origin_head nor branch are provided") 

1647 return head_ref 

1648 

1649 

1650def _set_head( 

1651 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None 

1652) -> bytes | None: 

1653 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1654 # detach HEAD at specified tag 

1655 head = refs[head_ref] 

1656 if isinstance(head, Tag): 

1657 _cls, obj = head.object 

1658 head = obj.get_object(obj).id 

1659 del refs[HEADREF] 

1660 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1661 else: 

1662 # set HEAD to specific branch 

1663 try: 

1664 head = refs[head_ref] 

1665 refs.set_symbolic_ref(HEADREF, head_ref) 

1666 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1667 except KeyError: 

1668 head = None 

1669 return head 

1670 

1671 

1672def _import_remote_refs( 

1673 refs_container: RefsContainer, 

1674 remote_name: str, 

1675 refs: dict[bytes, bytes | None], 

1676 message: bytes | None = None, 

1677 prune: bool = False, 

1678 prune_tags: bool = False, 

1679) -> None: 

1680 stripped_refs = strip_peeled_refs(refs) 

1681 branches = { 

1682 extract_branch_name(n): v 

1683 for (n, v) in stripped_refs.items() 

1684 if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None 

1685 } 

1686 refs_container.import_refs( 

1687 b"refs/remotes/" + remote_name.encode(), 

1688 branches, 

1689 message=message, 

1690 prune=prune, 

1691 ) 

1692 tags = { 

1693 extract_tag_name(n): v 

1694 for (n, v) in stripped_refs.items() 

1695 if n.startswith(LOCAL_TAG_PREFIX) 

1696 and not n.endswith(PEELED_TAG_SUFFIX) 

1697 and v is not None 

1698 } 

1699 refs_container.import_refs( 

1700 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1701 ) 

1702 

1703 

1704def serialize_refs( 

1705 store: ObjectContainer, refs: Mapping[bytes, bytes] 

1706) -> dict[bytes, bytes]: 

1707 """Serialize refs with peeled refs. 

1708 

1709 Args: 

1710 store: Object store to peel refs from 

1711 refs: Dictionary of ref names to SHAs 

1712 

1713 Returns: 

1714 Dictionary with refs and peeled refs (marked with ^{}) 

1715 """ 

1716 # TODO: Avoid recursive import :( 

1717 from .object_store import peel_sha 

1718 

1719 ret = {} 

1720 for ref, sha in refs.items(): 

1721 try: 

1722 unpeeled, peeled = peel_sha(store, sha) 

1723 except KeyError: 

1724 warnings.warn( 

1725 "ref {} points at non-present sha {}".format( 

1726 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1727 ), 

1728 UserWarning, 

1729 ) 

1730 continue 

1731 else: 

1732 if isinstance(unpeeled, Tag): 

1733 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1734 ret[ref] = unpeeled.id 

1735 return ret 

1736 

1737 

1738class locked_ref: 

1739 """Lock a ref while making modifications. 

1740 

1741 Works as a context manager. 

1742 """ 

1743 

1744 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1745 """Initialize a locked ref. 

1746 

1747 Args: 

1748 refs_container: The DiskRefsContainer to lock the ref in 

1749 refname: The ref name to lock 

1750 """ 

1751 self._refs_container = refs_container 

1752 self._refname = refname 

1753 self._file: _GitFile | None = None 

1754 self._realname: Ref | None = None 

1755 self._deleted = False 

1756 

1757 def __enter__(self) -> "locked_ref": 

1758 """Enter the context manager and acquire the lock. 

1759 

1760 Returns: 

1761 This locked_ref instance 

1762 

1763 Raises: 

1764 OSError: If the lock cannot be acquired 

1765 """ 

1766 self._refs_container._check_refname(self._refname) 

1767 try: 

1768 realnames, _ = self._refs_container.follow(self._refname) 

1769 self._realname = realnames[-1] 

1770 except (KeyError, IndexError, SymrefLoop): 

1771 self._realname = self._refname 

1772 

1773 filename = self._refs_container.refpath(self._realname) 

1774 ensure_dir_exists(os.path.dirname(filename)) 

1775 f = GitFile(filename, "wb") 

1776 self._file = f 

1777 return self 

1778 

1779 def __exit__( 

1780 self, 

1781 exc_type: type | None, 

1782 exc_value: BaseException | None, 

1783 traceback: types.TracebackType | None, 

1784 ) -> None: 

1785 """Exit the context manager and release the lock. 

1786 

1787 Args: 

1788 exc_type: Type of exception if one occurred 

1789 exc_value: Exception instance if one occurred 

1790 traceback: Traceback if an exception occurred 

1791 """ 

1792 if self._file: 

1793 if exc_type is not None or self._deleted: 

1794 self._file.abort() 

1795 else: 

1796 self._file.close() 

1797 

1798 def get(self) -> bytes | None: 

1799 """Get the current value of the ref.""" 

1800 if not self._file: 

1801 raise RuntimeError("locked_ref not in context") 

1802 

1803 assert self._realname is not None 

1804 current_ref = self._refs_container.read_loose_ref(self._realname) 

1805 if current_ref is None: 

1806 current_ref = self._refs_container.get_packed_refs().get( 

1807 self._realname, None 

1808 ) 

1809 return current_ref 

1810 

1811 def ensure_equals(self, expected_value: bytes | None) -> bool: 

1812 """Ensure the ref currently equals the expected value. 

1813 

1814 Args: 

1815 expected_value: The expected current value of the ref 

1816 Returns: 

1817 True if the ref equals the expected value, False otherwise 

1818 """ 

1819 current_value = self.get() 

1820 return current_value == expected_value 

1821 

1822 def set(self, new_ref: bytes) -> None: 

1823 """Set the ref to a new value. 

1824 

1825 Args: 

1826 new_ref: The new SHA1 or symbolic ref value 

1827 """ 

1828 if not self._file: 

1829 raise RuntimeError("locked_ref not in context") 

1830 

1831 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1832 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1833 

1834 self._file.seek(0) 

1835 self._file.truncate() 

1836 self._file.write(new_ref + b"\n") 

1837 self._deleted = False 

1838 

1839 def set_symbolic_ref(self, target: Ref) -> None: 

1840 """Make this ref point at another ref. 

1841 

1842 Args: 

1843 target: Name of the ref to point at 

1844 """ 

1845 if not self._file: 

1846 raise RuntimeError("locked_ref not in context") 

1847 

1848 self._refs_container._check_refname(target) 

1849 self._file.seek(0) 

1850 self._file.truncate() 

1851 self._file.write(SYMREF + target + b"\n") 

1852 self._deleted = False 

1853 

1854 def delete(self) -> None: 

1855 """Delete the ref file while holding the lock.""" 

1856 if not self._file: 

1857 raise RuntimeError("locked_ref not in context") 

1858 

1859 # Delete the actual ref file while holding the lock 

1860 if self._realname: 

1861 filename = self._refs_container.refpath(self._realname) 

1862 try: 

1863 if os.path.lexists(filename): 

1864 os.remove(filename) 

1865 except FileNotFoundError: 

1866 pass 

1867 self._refs_container._remove_packed_ref(self._realname) 

1868 

1869 self._deleted = True 

1870 

1871 

1872class NamespacedRefsContainer(RefsContainer): 

1873 """Wrapper that adds namespace prefix to all ref operations. 

1874 

1875 This implements Git's GIT_NAMESPACE feature, which stores refs under 

1876 refs/namespaces/<namespace>/ and filters operations to only show refs 

1877 within that namespace. 

1878 

1879 Example: 

1880 With namespace "foo", a ref "refs/heads/master" is stored as 

1881 "refs/namespaces/foo/refs/heads/master" in the underlying container. 

1882 """ 

1883 

1884 def __init__(self, refs: RefsContainer, namespace: bytes) -> None: 

1885 """Initialize NamespacedRefsContainer. 

1886 

1887 Args: 

1888 refs: The underlying refs container to wrap 

1889 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar") 

1890 """ 

1891 super().__init__(logger=refs._logger) 

1892 self._refs = refs 

1893 # Build namespace prefix: refs/namespaces/<namespace>/ 

1894 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/ 

1895 namespace_parts = namespace.split(b"/") 

1896 self._namespace_prefix = b"" 

1897 for part in namespace_parts: 

1898 self._namespace_prefix += b"refs/namespaces/" + part + b"/" 

1899 

1900 def _apply_namespace(self, name: bytes) -> bytes: 

1901 """Apply namespace prefix to a ref name.""" 

1902 # HEAD and other special refs are not namespaced 

1903 if name == HEADREF or not name.startswith(b"refs/"): 

1904 return name 

1905 return self._namespace_prefix + name 

1906 

1907 def _strip_namespace(self, name: bytes) -> bytes | None: 

1908 """Remove namespace prefix from a ref name. 

1909 

1910 Returns None if the ref is not in our namespace. 

1911 """ 

1912 # HEAD and other special refs are not namespaced 

1913 if name == HEADREF or not name.startswith(b"refs/"): 

1914 return name 

1915 if name.startswith(self._namespace_prefix): 

1916 return name[len(self._namespace_prefix) :] 

1917 return None 

1918 

1919 def allkeys(self) -> set[bytes]: 

1920 """Return all reference keys in this namespace.""" 

1921 keys = set() 

1922 for key in self._refs.allkeys(): 

1923 stripped = self._strip_namespace(key) 

1924 if stripped is not None: 

1925 keys.add(stripped) 

1926 return keys 

1927 

1928 def read_loose_ref(self, name: bytes) -> bytes | None: 

1929 """Read a loose reference.""" 

1930 return self._refs.read_loose_ref(self._apply_namespace(name)) 

1931 

1932 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

1933 """Get packed refs within this namespace.""" 

1934 packed = {} 

1935 for name, value in self._refs.get_packed_refs().items(): 

1936 stripped = self._strip_namespace(name) 

1937 if stripped is not None: 

1938 packed[stripped] = value 

1939 return packed 

1940 

1941 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

1942 """Add packed refs with namespace prefix.""" 

1943 namespaced_refs = { 

1944 self._apply_namespace(name): value for name, value in new_refs.items() 

1945 } 

1946 self._refs.add_packed_refs(namespaced_refs) 

1947 

1948 def get_peeled(self, name: bytes) -> ObjectID | None: 

1949 """Return the cached peeled value of a ref.""" 

1950 return self._refs.get_peeled(self._apply_namespace(name)) 

1951 

1952 def set_symbolic_ref( 

1953 self, 

1954 name: bytes, 

1955 other: bytes, 

1956 committer: bytes | None = None, 

1957 timestamp: int | None = None, 

1958 timezone: int | None = None, 

1959 message: bytes | None = None, 

1960 ) -> None: 

1961 """Make a ref point at another ref.""" 

1962 self._refs.set_symbolic_ref( 

1963 self._apply_namespace(name), 

1964 self._apply_namespace(other), 

1965 committer=committer, 

1966 timestamp=timestamp, 

1967 timezone=timezone, 

1968 message=message, 

1969 ) 

1970 

1971 def set_if_equals( 

1972 self, 

1973 name: bytes, 

1974 old_ref: bytes | None, 

1975 new_ref: bytes, 

1976 committer: bytes | None = None, 

1977 timestamp: int | None = None, 

1978 timezone: int | None = None, 

1979 message: bytes | None = None, 

1980 ) -> bool: 

1981 """Set a refname to new_ref only if it currently equals old_ref.""" 

1982 return self._refs.set_if_equals( 

1983 self._apply_namespace(name), 

1984 old_ref, 

1985 new_ref, 

1986 committer=committer, 

1987 timestamp=timestamp, 

1988 timezone=timezone, 

1989 message=message, 

1990 ) 

1991 

1992 def add_if_new( 

1993 self, 

1994 name: bytes, 

1995 ref: bytes, 

1996 committer: bytes | None = None, 

1997 timestamp: int | None = None, 

1998 timezone: int | None = None, 

1999 message: bytes | None = None, 

2000 ) -> bool: 

2001 """Add a new reference only if it does not already exist.""" 

2002 return self._refs.add_if_new( 

2003 self._apply_namespace(name), 

2004 ref, 

2005 committer=committer, 

2006 timestamp=timestamp, 

2007 timezone=timezone, 

2008 message=message, 

2009 ) 

2010 

2011 def remove_if_equals( 

2012 self, 

2013 name: bytes, 

2014 old_ref: bytes | None, 

2015 committer: bytes | None = None, 

2016 timestamp: int | None = None, 

2017 timezone: int | None = None, 

2018 message: bytes | None = None, 

2019 ) -> bool: 

2020 """Remove a refname only if it currently equals old_ref.""" 

2021 return self._refs.remove_if_equals( 

2022 self._apply_namespace(name), 

2023 old_ref, 

2024 committer=committer, 

2025 timestamp=timestamp, 

2026 timezone=timezone, 

2027 message=message, 

2028 ) 

2029 

2030 def pack_refs(self, all: bool = False) -> None: 

2031 """Pack loose refs into packed-refs file. 

2032 

2033 Note: This packs all refs in the underlying container, not just 

2034 those in the namespace. 

2035 """ 

2036 self._refs.pack_refs(all=all) 

2037 

2038 

2039def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

2040 """Filter refs to only include those with a given prefix. 

2041 

2042 Args: 

2043 refs: A dictionary of refs. 

2044 prefixes: The prefixes to filter by. 

2045 """ 

2046 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

2047 return filtered 

2048 

2049 

2050def is_per_worktree_ref(ref: bytes) -> bool: 

2051 """Returns whether a reference is stored per worktree or not. 

2052 

2053 Per-worktree references are: 

2054 - all pseudorefs, e.g. HEAD 

2055 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

2056 

2057 All refs starting with "refs/" are shared, except for the ones listed above. 

2058 

2059 See https://git-scm.com/docs/git-worktree#_refs. 

2060 """ 

2061 return not ref.startswith(b"refs/") or ref.startswith( 

2062 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

2063 )