Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

781 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25__all__ = [ 

26 "HEADREF", 

27 "LOCAL_BRANCH_PREFIX", 

28 "LOCAL_NOTES_PREFIX", 

29 "LOCAL_REMOTE_PREFIX", 

30 "LOCAL_REPLACE_PREFIX", 

31 "LOCAL_TAG_PREFIX", 

32 "SYMREF", 

33 "DictRefsContainer", 

34 "DiskRefsContainer", 

35 "NamespacedRefsContainer", 

36 "Ref", 

37 "RefsContainer", 

38 "SymrefLoop", 

39 "check_ref_format", 

40 "extract_branch_name", 

41 "extract_tag_name", 

42 "filter_ref_prefix", 

43 "is_local_branch", 

44 "is_per_worktree_ref", 

45 "local_branch_name", 

46 "local_replace_name", 

47 "local_tag_name", 

48 "parse_remote_ref", 

49 "parse_symref_value", 

50 "read_info_refs", 

51 "read_packed_refs", 

52 "read_packed_refs_with_peeled", 

53 "set_ref_from_raw", 

54 "shorten_ref_name", 

55 "write_packed_refs", 

56] 

57 

58import os 

59import sys 

60import types 

61import warnings 

62from collections.abc import Callable, Iterable, Iterator, Mapping 

63from contextlib import suppress 

64from typing import ( 

65 IO, 

66 TYPE_CHECKING, 

67 Any, 

68 BinaryIO, 

69 NewType, 

70 TypeVar, 

71) 

72 

73if sys.version_info >= (3, 11): 

74 from typing import Self 

75else: 

76 from typing_extensions import Self 

77 

78if TYPE_CHECKING: 

79 from .file import _GitFile 

80 

81from .errors import PackedRefsException, RefFormatError 

82from .file import GitFile, ensure_dir_exists 

83from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

84 

85Ref = NewType("Ref", bytes) 

86 

87T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None]) 

88 

89HEADREF = Ref(b"HEAD") 

90SYMREF = b"ref: " 

91LOCAL_BRANCH_PREFIX = b"refs/heads/" 

92LOCAL_TAG_PREFIX = b"refs/tags/" 

93LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

94LOCAL_NOTES_PREFIX = b"refs/notes/" 

95LOCAL_REPLACE_PREFIX = b"refs/replace/" 

96BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[") 

97 

98 

99class SymrefLoop(Exception): 

100 """There is a loop between one or more symrefs.""" 

101 

102 def __init__(self, ref: bytes, depth: int) -> None: 

103 """Initialize SymrefLoop exception.""" 

104 self.ref = ref 

105 self.depth = depth 

106 

107 

108def parse_symref_value(contents: bytes) -> bytes: 

109 """Parse a symref value. 

110 

111 Args: 

112 contents: Contents to parse 

113 Returns: Destination 

114 """ 

115 if contents.startswith(SYMREF): 

116 return contents[len(SYMREF) :].rstrip(b"\r\n") 

117 raise ValueError(contents) 

118 

119 

120def check_ref_format(refname: Ref) -> bool: 

121 """Check if a refname is correctly formatted. 

122 

123 Implements all the same rules as git-check-ref-format[1]. 

124 

125 [1] 

126 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

127 

128 Args: 

129 refname: The refname to check 

130 Returns: True if refname is valid, False otherwise 

131 """ 

132 # These could be combined into one big expression, but are listed 

133 # separately to parallel [1]. 

134 if refname == b"@": 

135 return False 

136 if b"/" not in refname: # type: ignore[comparison-overlap] 

137 return False 

138 if b".." in refname: # type: ignore[comparison-overlap] 

139 return False 

140 for i, c in enumerate(refname): 

141 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

142 return False 

143 if refname[-1] in b"/.": 

144 return False 

145 if b"@{" in refname: # type: ignore[comparison-overlap] 

146 return False 

147 if b"\\" in refname: # type: ignore[comparison-overlap] 

148 return False 

149 for component in refname.split(b"/"): 

150 if not component: 

151 return False 

152 if component.startswith(b"."): 

153 return False 

154 if component.endswith(b".lock"): 

155 return False 

156 return True 

157 

158 

159def _collapse_slashes(refname: bytes) -> bytes: 

160 """Collapse runs of consecutive slashes in a ref name into a single slash.""" 

161 return b"/".join(component for component in refname.split(b"/") if component) 

162 

163 

164def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]: 

165 """Parse a remote ref into remote name and branch name. 

166 

167 Args: 

168 ref: Remote ref like b"refs/remotes/origin/main" 

169 

170 Returns: 

171 Tuple of (remote_name, branch_name) 

172 

173 Raises: 

174 ValueError: If ref is not a valid remote ref 

175 """ 

176 if not ref.startswith(LOCAL_REMOTE_PREFIX): 

177 raise ValueError(f"Not a remote ref: {ref!r}") 

178 

179 # Remove the prefix 

180 remainder = ref[len(LOCAL_REMOTE_PREFIX) :] 

181 

182 # Split into remote name and branch name 

183 parts = remainder.split(b"/", 1) 

184 if len(parts) != 2: 

185 raise ValueError(f"Invalid remote ref format: {ref!r}") 

186 

187 remote_name, branch_name = parts 

188 return (remote_name, branch_name) 

189 

190 

191def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None: 

192 """Set a reference from a raw ref value. 

193 

194 This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs. 

195 

196 Args: 

197 refs: The RefsContainer to set the ref in 

198 name: The ref name to set 

199 raw_ref: The raw ref value (either a symbolic ref or an ObjectID) 

200 """ 

201 if raw_ref.startswith(SYMREF): 

202 # It's a symbolic ref 

203 target = Ref(raw_ref[len(SYMREF) :]) 

204 refs.set_symbolic_ref(name, target) 

205 else: 

206 # It's a direct ObjectID 

207 refs[name] = ObjectID(raw_ref) 

208 

209 

210class RefsContainer: 

211 """A container for refs.""" 

212 

213 def __init__( 

214 self, 

215 logger: Callable[ 

216 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

217 ] 

218 | None = None, 

219 ) -> None: 

220 """Initialize RefsContainer with optional logger function.""" 

221 self._logger = logger 

222 

223 def _log( 

224 self, 

225 ref: bytes, 

226 old_sha: bytes | None, 

227 new_sha: bytes | None, 

228 committer: bytes | None = None, 

229 timestamp: int | None = None, 

230 timezone: int | None = None, 

231 message: bytes | None = None, 

232 ) -> None: 

233 if self._logger is None: 

234 return 

235 if message is None: 

236 return 

237 # Use ZERO_SHA for None values, matching git behavior 

238 if old_sha is None: 

239 old_sha = ZERO_SHA 

240 if new_sha is None: 

241 new_sha = ZERO_SHA 

242 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

243 

244 def set_symbolic_ref( 

245 self, 

246 name: Ref, 

247 other: Ref, 

248 committer: bytes | None = None, 

249 timestamp: int | None = None, 

250 timezone: int | None = None, 

251 message: bytes | None = None, 

252 ) -> None: 

253 """Make a ref point at another ref. 

254 

255 Args: 

256 name: Name of the ref to set 

257 other: Name of the ref to point at 

258 committer: Optional committer name/email 

259 timestamp: Optional timestamp 

260 timezone: Optional timezone 

261 message: Optional message 

262 """ 

263 raise NotImplementedError(self.set_symbolic_ref) 

264 

265 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

266 """Get contents of the packed-refs file. 

267 

268 Returns: Dictionary mapping ref names to SHA1s 

269 

270 Note: Will return an empty dictionary when no packed-refs file is 

271 present. 

272 """ 

273 raise NotImplementedError(self.get_packed_refs) 

274 

275 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

276 """Add the given refs as packed refs. 

277 

278 Args: 

279 new_refs: A mapping of ref names to targets; if a target is None that 

280 means remove the ref 

281 """ 

282 raise NotImplementedError(self.add_packed_refs) 

283 

284 def get_peeled(self, name: Ref) -> ObjectID | None: 

285 """Return the cached peeled value of a ref, if available. 

286 

287 Args: 

288 name: Name of the ref to peel 

289 Returns: The peeled value of the ref. If the ref is known not point to 

290 a tag, this will be the SHA the ref refers to. If the ref may point 

291 to a tag, but no cached information is available, None is returned. 

292 """ 

293 return None 

294 

295 def import_refs( 

296 self, 

297 base: Ref, 

298 other: Mapping[Ref, ObjectID | None], 

299 committer: bytes | None = None, 

300 timestamp: bytes | None = None, 

301 timezone: bytes | None = None, 

302 message: bytes | None = None, 

303 prune: bool = False, 

304 ) -> None: 

305 """Import refs from another repository. 

306 

307 Args: 

308 base: Base ref to import into (e.g., b'refs/remotes/origin') 

309 other: Dictionary of refs to import 

310 committer: Optional committer for reflog 

311 timestamp: Optional timestamp for reflog 

312 timezone: Optional timezone for reflog 

313 message: Optional message for reflog 

314 prune: If True, remove refs not in other 

315 """ 

316 # Strip a trailing slash so joining ``base`` with a ref name below 

317 # does not produce a malformed ref with an empty path component 

318 # (e.g. b'refs/tags/' + b'/' + b'v1.0' -> b'refs/tags//v1.0'). 

319 base = Ref(base.rstrip(b"/")) 

320 if prune: 

321 to_delete = set(self.subkeys(base)) 

322 else: 

323 to_delete = set() 

324 for name, value in other.items(): 

325 if value is None: 

326 to_delete.add(name) 

327 else: 

328 self.set_if_equals( 

329 Ref(b"/".join((base, name))), None, value, message=message 

330 ) 

331 if to_delete: 

332 try: 

333 to_delete.remove(name) 

334 except KeyError: 

335 pass 

336 for ref in to_delete: 

337 self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message) 

338 

339 def allkeys(self) -> set[Ref]: 

340 """All refs present in this container.""" 

341 raise NotImplementedError(self.allkeys) 

342 

343 def __iter__(self) -> Iterator[Ref]: 

344 """Iterate over all reference keys.""" 

345 return iter(self.allkeys()) 

346 

347 def keys(self, base: Ref | None = None) -> set[Ref]: 

348 """Refs present in this container. 

349 

350 Args: 

351 base: An optional base to return refs under. 

352 Returns: An unsorted set of valid refs in this container, including 

353 packed refs. 

354 """ 

355 if base is not None: 

356 return self.subkeys(base) 

357 else: 

358 return self.allkeys() 

359 

360 def subkeys(self, base: Ref) -> set[Ref]: 

361 """Refs present in this container under a base. 

362 

363 Args: 

364 base: The base to return refs under. 

365 Returns: A set of valid refs in this container under the base; the base 

366 prefix is stripped from the ref names returned. 

367 """ 

368 keys: set[Ref] = set() 

369 base_len = len(base) + 1 

370 for refname in self.allkeys(): 

371 if refname.startswith(base): 

372 keys.add(Ref(refname[base_len:])) 

373 return keys 

374 

375 def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]: 

376 """Return the contents of this container as a dictionary.""" 

377 ret: dict[Ref, ObjectID] = {} 

378 keys = self.keys(base) 

379 base_bytes: bytes 

380 if base is None: 

381 base_bytes = b"" 

382 else: 

383 base_bytes = base.rstrip(b"/") 

384 for key in keys: 

385 try: 

386 ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))] 

387 except (SymrefLoop, KeyError): 

388 continue # Unable to resolve 

389 

390 return ret 

391 

392 def _check_refname(self, name: Ref) -> None: 

393 """Ensure a refname is valid and lives in refs or is HEAD. 

394 

395 HEAD is not a valid refname according to git-check-ref-format, but this 

396 class needs to be able to touch HEAD. Also, check_ref_format expects 

397 refnames without the leading 'refs/', but this class requires that 

398 so it cannot touch anything outside the refs dir (or HEAD). 

399 

400 Args: 

401 name: The name of the reference. 

402 

403 Raises: 

404 KeyError: if a refname is not HEAD or is otherwise not valid. 

405 """ 

406 if name in (HEADREF, Ref(b"refs/stash")): 

407 return 

408 if not name.startswith(b"refs/"): 

409 raise RefFormatError(name) 

410 rest = Ref(name[5:]) 

411 if check_ref_format(rest): 

412 return 

413 # As of Dulwich 1.2.3 check_ref_format rejects empty path components 

414 # (e.g. b'refs/tags//v1.0'). Such names were silently accepted before, 

415 # and some callers (e.g. older Poetry releases) still construct them. 

416 # Warn rather than raise for now if collapsing repeated slashes would 

417 # make the name valid, so the only defect is empty components. 

418 if ( 

419 b"//" in name # type: ignore[comparison-overlap] 

420 and check_ref_format(Ref(_collapse_slashes(rest))) 

421 ): 

422 warnings.warn( 

423 f"Ref name {name!r} contains empty path components; " 

424 "this will be rejected in a future version of Dulwich.", 

425 DeprecationWarning, 

426 stacklevel=3, 

427 ) 

428 return 

429 raise RefFormatError(name) 

430 

431 def read_ref(self, refname: Ref) -> bytes | None: 

432 """Read a reference without following any references. 

433 

434 Args: 

435 refname: The name of the reference 

436 Returns: The contents of the ref file, or None if it does 

437 not exist. 

438 """ 

439 contents = self.read_loose_ref(refname) 

440 if not contents: 

441 contents = self.get_packed_refs().get(refname, None) 

442 return contents 

443 

444 def read_loose_ref(self, name: Ref) -> bytes | None: 

445 """Read a loose reference and return its contents. 

446 

447 Args: 

448 name: the refname to read 

449 Returns: The contents of the ref file, or None if it does 

450 not exist. 

451 """ 

452 raise NotImplementedError(self.read_loose_ref) 

453 

454 def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]: 

455 """Follow a reference name. 

456 

457 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

458 references in the chain 

459 """ 

460 contents: bytes | None = SYMREF + name 

461 depth = 0 

462 refnames: list[Ref] = [] 

463 while contents and contents.startswith(SYMREF): 

464 refname = Ref(contents[len(SYMREF) :]) 

465 refnames.append(refname) 

466 contents = self.read_ref(refname) 

467 if not contents: 

468 break 

469 depth += 1 

470 if depth > 5: 

471 raise SymrefLoop(name, depth) 

472 return refnames, ObjectID(contents) if contents else None 

473 

474 def __contains__(self, refname: Ref) -> bool: 

475 """Check if a reference exists.""" 

476 if self.read_ref(refname): 

477 return True 

478 return False 

479 

480 def __getitem__(self, name: Ref) -> ObjectID: 

481 """Get the SHA1 for a reference name. 

482 

483 This method follows all symbolic references. 

484 """ 

485 _, sha = self.follow(name) 

486 if sha is None: 

487 raise KeyError(name) 

488 return sha 

489 

490 def set_if_equals( 

491 self, 

492 name: Ref, 

493 old_ref: ObjectID | None, 

494 new_ref: ObjectID, 

495 committer: bytes | None = None, 

496 timestamp: int | None = None, 

497 timezone: int | None = None, 

498 message: bytes | None = None, 

499 ) -> bool: 

500 """Set a refname to new_ref only if it currently equals old_ref. 

501 

502 This method follows all symbolic references if applicable for the 

503 subclass, and can be used to perform an atomic compare-and-swap 

504 operation. 

505 

506 Args: 

507 name: The refname to set. 

508 old_ref: The old sha the refname must refer to, or None to set 

509 unconditionally. 

510 new_ref: The new sha the refname will refer to. 

511 committer: Optional committer name/email 

512 timestamp: Optional timestamp 

513 timezone: Optional timezone 

514 message: Message for reflog 

515 Returns: True if the set was successful, False otherwise. 

516 """ 

517 raise NotImplementedError(self.set_if_equals) 

518 

519 def add_if_new( 

520 self, 

521 name: Ref, 

522 ref: ObjectID, 

523 committer: bytes | None = None, 

524 timestamp: int | None = None, 

525 timezone: int | None = None, 

526 message: bytes | None = None, 

527 ) -> bool: 

528 """Add a new reference only if it does not already exist. 

529 

530 Args: 

531 name: Ref name 

532 ref: Ref value 

533 committer: Optional committer name/email 

534 timestamp: Optional timestamp 

535 timezone: Optional timezone 

536 message: Optional message for reflog 

537 """ 

538 raise NotImplementedError(self.add_if_new) 

539 

540 def __setitem__(self, name: Ref, ref: ObjectID) -> None: 

541 """Set a reference name to point to the given SHA1. 

542 

543 This method follows all symbolic references if applicable for the 

544 subclass. 

545 

546 Note: This method unconditionally overwrites the contents of a 

547 reference. To update atomically only if the reference has not 

548 changed, use set_if_equals(). 

549 

550 Args: 

551 name: The refname to set. 

552 ref: The new sha the refname will refer to. 

553 """ 

554 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

555 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

556 self.set_if_equals(name, None, ref) 

557 

558 def remove_if_equals( 

559 self, 

560 name: Ref, 

561 old_ref: ObjectID | None, 

562 committer: bytes | None = None, 

563 timestamp: int | None = None, 

564 timezone: int | None = None, 

565 message: bytes | None = None, 

566 ) -> bool: 

567 """Remove a refname only if it currently equals old_ref. 

568 

569 This method does not follow symbolic references, even if applicable for 

570 the subclass. It can be used to perform an atomic compare-and-delete 

571 operation. 

572 

573 Args: 

574 name: The refname to delete. 

575 old_ref: The old sha the refname must refer to, or None to 

576 delete unconditionally. 

577 committer: Optional committer name/email 

578 timestamp: Optional timestamp 

579 timezone: Optional timezone 

580 message: Message for reflog 

581 Returns: True if the delete was successful, False otherwise. 

582 """ 

583 raise NotImplementedError(self.remove_if_equals) 

584 

585 def __delitem__(self, name: Ref) -> None: 

586 """Remove a refname. 

587 

588 This method does not follow symbolic references, even if applicable for 

589 the subclass. 

590 

591 Note: This method unconditionally deletes the contents of a reference. 

592 To delete atomically only if the reference has not changed, use 

593 remove_if_equals(). 

594 

595 Args: 

596 name: The refname to delete. 

597 """ 

598 self.remove_if_equals(name, None) 

599 

600 def get_symrefs(self) -> dict[Ref, Ref]: 

601 """Get a dict with all symrefs in this container. 

602 

603 Returns: Dictionary mapping source ref to target ref 

604 """ 

605 ret: dict[Ref, Ref] = {} 

606 for src in self.allkeys(): 

607 try: 

608 ref_value = self.read_ref(src) 

609 assert ref_value is not None 

610 dst = parse_symref_value(ref_value) 

611 except ValueError: 

612 pass 

613 else: 

614 ret[src] = Ref(dst) 

615 return ret 

616 

617 def pack_refs(self, all: bool = False) -> None: 

618 """Pack loose refs into packed-refs file. 

619 

620 Args: 

621 all: If True, pack all refs. If False, only pack tags. 

622 """ 

623 raise NotImplementedError(self.pack_refs) 

624 

625 

626class DictRefsContainer(RefsContainer): 

627 """RefsContainer backed by a simple dict. 

628 

629 This container does not support symbolic or packed references and is not 

630 threadsafe. 

631 """ 

632 

633 def __init__( 

634 self, 

635 refs: dict[Ref, bytes], 

636 logger: Callable[ 

637 [ 

638 bytes, 

639 bytes | None, 

640 bytes | None, 

641 bytes | None, 

642 int | None, 

643 int | None, 

644 bytes | None, 

645 ], 

646 None, 

647 ] 

648 | None = None, 

649 ) -> None: 

650 """Initialize DictRefsContainer with refs dictionary and optional logger.""" 

651 super().__init__(logger=logger) 

652 self._refs = refs 

653 self._peeled: dict[Ref, ObjectID] = {} 

654 self._watchers: set[Any] = set() 

655 

656 def allkeys(self) -> set[Ref]: 

657 """Return all reference keys.""" 

658 return set(self._refs.keys()) 

659 

660 def read_loose_ref(self, name: Ref) -> bytes | None: 

661 """Read a loose reference.""" 

662 return self._refs.get(name, None) 

663 

664 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

665 """Get packed references.""" 

666 return {} 

667 

668 def _notify(self, ref: bytes, newsha: bytes | None) -> None: 

669 for watcher in self._watchers: 

670 watcher._notify((ref, newsha)) 

671 

672 def set_symbolic_ref( 

673 self, 

674 name: Ref, 

675 other: Ref, 

676 committer: bytes | None = None, 

677 timestamp: int | None = None, 

678 timezone: int | None = None, 

679 message: bytes | None = None, 

680 ) -> None: 

681 """Make a ref point at another ref. 

682 

683 Args: 

684 name: Name of the ref to set 

685 other: Name of the ref to point at 

686 committer: Optional committer name for reflog 

687 timestamp: Optional timestamp for reflog 

688 timezone: Optional timezone for reflog 

689 message: Optional message for reflog 

690 """ 

691 old = self.follow(name)[-1] 

692 new = SYMREF + other 

693 self._refs[name] = new 

694 self._notify(name, new) 

695 self._log( 

696 name, 

697 old, 

698 new, 

699 committer=committer, 

700 timestamp=timestamp, 

701 timezone=timezone, 

702 message=message, 

703 ) 

704 

705 def set_if_equals( 

706 self, 

707 name: Ref, 

708 old_ref: ObjectID | None, 

709 new_ref: ObjectID, 

710 committer: bytes | None = None, 

711 timestamp: int | None = None, 

712 timezone: int | None = None, 

713 message: bytes | None = None, 

714 ) -> bool: 

715 """Set a refname to new_ref only if it currently equals old_ref. 

716 

717 This method follows all symbolic references, and can be used to perform 

718 an atomic compare-and-swap operation. 

719 

720 Args: 

721 name: The refname to set. 

722 old_ref: The old sha the refname must refer to, or None to set 

723 unconditionally. 

724 new_ref: The new sha the refname will refer to. 

725 committer: Optional committer name for reflog 

726 timestamp: Optional timestamp for reflog 

727 timezone: Optional timezone for reflog 

728 message: Optional message for reflog 

729 

730 Returns: 

731 True if the set was successful, False otherwise. 

732 """ 

733 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

734 return False 

735 # Only update the specific ref requested, not the whole chain 

736 self._check_refname(name) 

737 old = self._refs.get(name) 

738 self._refs[name] = new_ref 

739 self._notify(name, new_ref) 

740 self._log( 

741 name, 

742 old, 

743 new_ref, 

744 committer=committer, 

745 timestamp=timestamp, 

746 timezone=timezone, 

747 message=message, 

748 ) 

749 return True 

750 

751 def add_if_new( 

752 self, 

753 name: Ref, 

754 ref: ObjectID, 

755 committer: bytes | None = None, 

756 timestamp: int | None = None, 

757 timezone: int | None = None, 

758 message: bytes | None = None, 

759 ) -> bool: 

760 """Add a new reference only if it does not already exist. 

761 

762 Args: 

763 name: Ref name 

764 ref: Ref value 

765 committer: Optional committer name for reflog 

766 timestamp: Optional timestamp for reflog 

767 timezone: Optional timezone for reflog 

768 message: Optional message for reflog 

769 

770 Returns: 

771 True if the add was successful, False otherwise. 

772 """ 

773 if name in self._refs: 

774 return False 

775 self._refs[name] = ref 

776 self._notify(name, ref) 

777 self._log( 

778 name, 

779 None, 

780 ref, 

781 committer=committer, 

782 timestamp=timestamp, 

783 timezone=timezone, 

784 message=message, 

785 ) 

786 return True 

787 

788 def remove_if_equals( 

789 self, 

790 name: Ref, 

791 old_ref: ObjectID | None, 

792 committer: bytes | None = None, 

793 timestamp: int | None = None, 

794 timezone: int | None = None, 

795 message: bytes | None = None, 

796 ) -> bool: 

797 """Remove a refname only if it currently equals old_ref. 

798 

799 This method does not follow symbolic references. It can be used to 

800 perform an atomic compare-and-delete operation. 

801 

802 Args: 

803 name: The refname to delete. 

804 old_ref: The old sha the refname must refer to, or None to 

805 delete unconditionally. 

806 committer: Optional committer name for reflog 

807 timestamp: Optional timestamp for reflog 

808 timezone: Optional timezone for reflog 

809 message: Optional message for reflog 

810 

811 Returns: 

812 True if the delete was successful, False otherwise. 

813 """ 

814 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

815 return False 

816 try: 

817 old = self._refs.pop(name) 

818 except KeyError: 

819 pass 

820 else: 

821 self._notify(name, None) 

822 self._log( 

823 name, 

824 old, 

825 None, 

826 committer=committer, 

827 timestamp=timestamp, 

828 timezone=timezone, 

829 message=message, 

830 ) 

831 return True 

832 

833 def get_peeled(self, name: Ref) -> ObjectID | None: 

834 """Get peeled version of a reference.""" 

835 return self._peeled.get(name) 

836 

837 def _update(self, refs: Mapping[Ref, ObjectID]) -> None: 

838 """Update multiple refs; intended only for testing.""" 

839 # TODO(dborowitz): replace this with a public function that uses 

840 # set_if_equal. 

841 for ref, sha in refs.items(): 

842 self.set_if_equals(ref, None, sha) 

843 

844 def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None: 

845 """Update cached peeled refs; intended only for testing.""" 

846 self._peeled.update(peeled) 

847 

848 

849class DiskRefsContainer(RefsContainer): 

850 """Refs container that reads refs from disk.""" 

851 

852 def __init__( 

853 self, 

854 path: str | bytes | os.PathLike[str], 

855 worktree_path: str | bytes | os.PathLike[str] | None = None, 

856 logger: Callable[ 

857 [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None 

858 ] 

859 | None = None, 

860 ) -> None: 

861 """Initialize DiskRefsContainer.""" 

862 super().__init__(logger=logger) 

863 # Convert path-like objects to strings, then to bytes for Git compatibility 

864 self.path = os.fsencode(os.fspath(path)) 

865 if worktree_path is None: 

866 self.worktree_path = self.path 

867 else: 

868 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

869 self._packed_refs: dict[Ref, ObjectID] | None = None 

870 self._peeled_refs: dict[Ref, ObjectID] | None = None 

871 

872 def __repr__(self) -> str: 

873 """Return string representation of DiskRefsContainer.""" 

874 return f"{self.__class__.__name__}({self.path!r})" 

875 

876 def _iter_dir( 

877 self, 

878 path: bytes, 

879 base: bytes, 

880 dir_filter: Callable[[bytes], bool] | None = None, 

881 ) -> Iterator[Ref]: 

882 refspath = os.path.join(path, base.rstrip(b"/")) 

883 prefix_len = len(os.path.join(path, b"")) 

884 

885 for root, dirs, files in os.walk(refspath): 

886 directory = root[prefix_len:] 

887 if os.path.sep != "/": 

888 directory = directory.replace(os.fsencode(os.path.sep), b"/") 

889 if dir_filter is not None: 

890 dirs[:] = [ 

891 d for d in dirs if dir_filter(b"/".join([directory, d, b""])) 

892 ] 

893 

894 for filename in files: 

895 refname = b"/".join([directory, filename]) 

896 if check_ref_format(Ref(refname)): 

897 yield Ref(refname) 

898 

899 def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]: 

900 base = base.rstrip(b"/") + b"/" 

901 search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = [] 

902 if base != b"refs/": 

903 path = self.worktree_path if is_per_worktree_ref(base) else self.path 

904 search_paths.append((path, None)) 

905 elif self.worktree_path == self.path: 

906 # Iterate through all the refs from the main worktree 

907 search_paths.append((self.path, None)) 

908 else: 

909 # Iterate through all the shared refs from the commondir, excluding per-worktree refs 

910 search_paths.append((self.path, lambda r: not is_per_worktree_ref(r))) 

911 # Iterate through all the per-worktree refs from the worktree's gitdir 

912 search_paths.append((self.worktree_path, is_per_worktree_ref)) 

913 

914 for path, dir_filter in search_paths: 

915 yield from self._iter_dir(path, base, dir_filter=dir_filter) 

916 

917 def subkeys(self, base: Ref) -> set[Ref]: 

918 """Return subkeys under a given base reference path.""" 

919 subkeys: set[Ref] = set() 

920 

921 for key in self._iter_loose_refs(base): 

922 if key.startswith(base): 

923 subkeys.add(Ref(key[len(base) :].strip(b"/"))) 

924 

925 for key in self.get_packed_refs(): 

926 if key.startswith(base): 

927 subkeys.add(Ref(key[len(base) :].strip(b"/"))) 

928 return subkeys 

929 

930 def allkeys(self) -> set[Ref]: 

931 """Return all reference keys.""" 

932 allkeys: set[Ref] = set() 

933 if os.path.exists(self.refpath(HEADREF)): 

934 allkeys.add(Ref(HEADREF)) 

935 

936 allkeys.update(self._iter_loose_refs()) 

937 allkeys.update(self.get_packed_refs()) 

938 return allkeys 

939 

940 def refpath(self, name: bytes) -> bytes: 

941 """Return the disk path of a ref.""" 

942 path = name 

943 if os.path.sep != "/": 

944 path = path.replace(b"/", os.fsencode(os.path.sep)) 

945 

946 root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path 

947 return os.path.join(root_dir, path) 

948 

949 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

950 """Get contents of the packed-refs file. 

951 

952 Returns: Dictionary mapping ref names to SHA1s 

953 

954 Note: Will return an empty dictionary when no packed-refs file is 

955 present. 

956 """ 

957 # TODO: invalidate the cache on repacking 

958 if self._packed_refs is None: 

959 # set both to empty because we want _peeled_refs to be 

960 # None if and only if _packed_refs is also None. 

961 self._packed_refs = {} 

962 self._peeled_refs = {} 

963 path = os.path.join(self.path, b"packed-refs") 

964 try: 

965 f = GitFile(path, "rb") 

966 except FileNotFoundError: 

967 return {} 

968 with f: 

969 first_line = next(iter(f)).rstrip() 

970 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

971 for sha, name, peeled in read_packed_refs_with_peeled(f): 

972 self._packed_refs[name] = sha 

973 if peeled: 

974 self._peeled_refs[name] = peeled 

975 else: 

976 f.seek(0) 

977 for sha, name in read_packed_refs(f): 

978 self._packed_refs[name] = sha 

979 return self._packed_refs 

980 

981 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

982 """Add the given refs as packed refs. 

983 

984 Args: 

985 new_refs: A mapping of ref names to targets; if a target is None that 

986 means remove the ref 

987 """ 

988 if not new_refs: 

989 return 

990 

991 path = os.path.join(self.path, b"packed-refs") 

992 

993 with GitFile(path, "wb") as f: 

994 # reread cached refs from disk, while holding the lock 

995 packed_refs = self.get_packed_refs().copy() 

996 

997 for ref, target in new_refs.items(): 

998 # sanity check 

999 if ref == HEADREF: 

1000 raise ValueError("cannot pack HEAD") 

1001 

1002 # remove any loose refs pointing to this one -- please 

1003 # note that this bypasses remove_if_equals as we don't 

1004 # want to affect packed refs in here 

1005 with suppress(OSError): 

1006 os.remove(self.refpath(ref)) 

1007 

1008 if target is not None: 

1009 packed_refs[ref] = target 

1010 else: 

1011 packed_refs.pop(ref, None) 

1012 

1013 write_packed_refs(f, packed_refs, self._peeled_refs) 

1014 

1015 self._packed_refs = packed_refs 

1016 

1017 def get_peeled(self, name: Ref) -> ObjectID | None: 

1018 """Return the cached peeled value of a ref, if available. 

1019 

1020 Args: 

1021 name: Name of the ref to peel 

1022 Returns: The peeled value of the ref. If the ref is known not point to 

1023 a tag, this will be the SHA the ref refers to. If the ref may point 

1024 to a tag, but no cached information is available, None is returned. 

1025 """ 

1026 self.get_packed_refs() 

1027 if ( 

1028 self._peeled_refs is None 

1029 or self._packed_refs is None 

1030 or name not in self._packed_refs 

1031 ): 

1032 # No cache: no peeled refs were read, or this ref is loose 

1033 return None 

1034 if name in self._peeled_refs: 

1035 return self._peeled_refs[name] 

1036 else: 

1037 # Known not peelable 

1038 return self[name] 

1039 

1040 def read_loose_ref(self, name: Ref) -> bytes | None: 

1041 """Read a reference file and return its contents. 

1042 

1043 If the reference file a symbolic reference, only read the first line of 

1044 the file. Otherwise, read the hash (40 bytes for SHA1, 64 bytes for SHA256). 

1045 

1046 Args: 

1047 name: the refname to read, relative to refpath 

1048 Returns: The contents of the ref file, or None if the file does not 

1049 exist. 

1050 

1051 Raises: 

1052 IOError: if any other error occurs 

1053 """ 

1054 filename = self.refpath(name) 

1055 try: 

1056 with GitFile(filename, "rb") as f: 

1057 header = f.read(len(SYMREF)) 

1058 if header == SYMREF: 

1059 # Read only the first line 

1060 return header + next(iter(f)).rstrip(b"\r\n") 

1061 else: 

1062 # Read the entire line to get the full hash (handles both SHA1 and SHA256) 

1063 f.seek(0) 

1064 line = f.readline().rstrip(b"\r\n") 

1065 return line 

1066 except (OSError, UnicodeError): 

1067 # don't assume anything specific about the error; in 

1068 # particular, invalid or forbidden paths can raise weird 

1069 # errors depending on the specific operating system 

1070 return None 

1071 

1072 def _remove_packed_ref(self, name: Ref) -> None: 

1073 if self._packed_refs is None: 

1074 return 

1075 filename = os.path.join(self.path, b"packed-refs") 

1076 # reread cached refs from disk, while holding the lock 

1077 f = GitFile(filename, "wb") 

1078 try: 

1079 self._packed_refs = None 

1080 self.get_packed_refs() 

1081 

1082 if self._packed_refs is None or name not in self._packed_refs: 

1083 f.abort() 

1084 return 

1085 

1086 del self._packed_refs[name] 

1087 if self._peeled_refs is not None: 

1088 with suppress(KeyError): 

1089 del self._peeled_refs[name] 

1090 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

1091 f.close() 

1092 except BaseException: 

1093 f.abort() 

1094 raise 

1095 

1096 def set_symbolic_ref( 

1097 self, 

1098 name: Ref, 

1099 other: Ref, 

1100 committer: bytes | None = None, 

1101 timestamp: int | None = None, 

1102 timezone: int | None = None, 

1103 message: bytes | None = None, 

1104 ) -> None: 

1105 """Make a ref point at another ref. 

1106 

1107 Args: 

1108 name: Name of the ref to set 

1109 other: Name of the ref to point at 

1110 committer: Optional committer name 

1111 timestamp: Optional timestamp 

1112 timezone: Optional timezone 

1113 message: Optional message to describe the change 

1114 """ 

1115 self._check_refname(name) 

1116 self._check_refname(other) 

1117 filename = self.refpath(name) 

1118 f = GitFile(filename, "wb") 

1119 try: 

1120 f.write(SYMREF + other + b"\n") 

1121 sha = self.follow(name)[-1] 

1122 self._log( 

1123 name, 

1124 sha, 

1125 sha, 

1126 committer=committer, 

1127 timestamp=timestamp, 

1128 timezone=timezone, 

1129 message=message, 

1130 ) 

1131 except BaseException: 

1132 f.abort() 

1133 raise 

1134 else: 

1135 f.close() 

1136 

1137 def set_if_equals( 

1138 self, 

1139 name: Ref, 

1140 old_ref: ObjectID | None, 

1141 new_ref: ObjectID, 

1142 committer: bytes | None = None, 

1143 timestamp: int | None = None, 

1144 timezone: int | None = None, 

1145 message: bytes | None = None, 

1146 ) -> bool: 

1147 """Set a refname to new_ref only if it currently equals old_ref. 

1148 

1149 This method follows all symbolic references, and can be used to perform 

1150 an atomic compare-and-swap operation. 

1151 

1152 Args: 

1153 name: The refname to set. 

1154 old_ref: The old sha the refname must refer to, or None to set 

1155 unconditionally. 

1156 new_ref: The new sha the refname will refer to. 

1157 committer: Optional committer name 

1158 timestamp: Optional timestamp 

1159 timezone: Optional timezone 

1160 message: Set message for reflog 

1161 Returns: True if the set was successful, False otherwise. 

1162 """ 

1163 self._check_refname(name) 

1164 try: 

1165 realnames, _ = self.follow(name) 

1166 realname = realnames[-1] 

1167 except (KeyError, IndexError, SymrefLoop): 

1168 realname = name 

1169 filename = self.refpath(realname) 

1170 

1171 # make sure none of the ancestor folders is in packed refs 

1172 probe_ref = Ref(os.path.dirname(realname)) 

1173 packed_refs = self.get_packed_refs() 

1174 while probe_ref: 

1175 if packed_refs.get(probe_ref, None) is not None: 

1176 raise NotADirectoryError(filename) 

1177 probe_ref = Ref(os.path.dirname(probe_ref)) 

1178 

1179 ensure_dir_exists(os.path.dirname(filename)) 

1180 with GitFile(filename, "wb") as f: 

1181 if old_ref is not None: 

1182 try: 

1183 # read again while holding the lock to handle race conditions 

1184 orig_ref = self.read_loose_ref(realname) 

1185 if orig_ref is None: 

1186 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

1187 if orig_ref != old_ref: 

1188 f.abort() 

1189 return False 

1190 except OSError: 

1191 f.abort() 

1192 raise 

1193 

1194 # Check if ref already has the desired value while holding the lock 

1195 # This avoids fsync when ref is unchanged but still detects lock conflicts 

1196 current_ref = self.read_loose_ref(realname) 

1197 if current_ref is None: 

1198 current_ref = packed_refs.get(realname, None) 

1199 

1200 if current_ref is not None and current_ref == new_ref: 

1201 # Ref already has desired value, abort write to avoid fsync 

1202 f.abort() 

1203 return True 

1204 

1205 try: 

1206 f.write(new_ref + b"\n") 

1207 except OSError: 

1208 f.abort() 

1209 raise 

1210 self._log( 

1211 realname, 

1212 old_ref, 

1213 new_ref, 

1214 committer=committer, 

1215 timestamp=timestamp, 

1216 timezone=timezone, 

1217 message=message, 

1218 ) 

1219 return True 

1220 

1221 def add_if_new( 

1222 self, 

1223 name: Ref, 

1224 ref: ObjectID, 

1225 committer: bytes | None = None, 

1226 timestamp: int | None = None, 

1227 timezone: int | None = None, 

1228 message: bytes | None = None, 

1229 ) -> bool: 

1230 """Add a new reference only if it does not already exist. 

1231 

1232 This method follows symrefs, and only ensures that the last ref in the 

1233 chain does not exist. 

1234 

1235 Args: 

1236 name: The refname to set. 

1237 ref: The new sha the refname will refer to. 

1238 committer: Optional committer name 

1239 timestamp: Optional timestamp 

1240 timezone: Optional timezone 

1241 message: Optional message for reflog 

1242 Returns: True if the add was successful, False otherwise. 

1243 """ 

1244 try: 

1245 realnames, contents = self.follow(name) 

1246 if contents is not None: 

1247 return False 

1248 realname = realnames[-1] 

1249 except (KeyError, IndexError): 

1250 realname = name 

1251 self._check_refname(realname) 

1252 filename = self.refpath(realname) 

1253 ensure_dir_exists(os.path.dirname(filename)) 

1254 with GitFile(filename, "wb") as f: 

1255 if os.path.exists(filename) or name in self.get_packed_refs(): 

1256 f.abort() 

1257 return False 

1258 try: 

1259 f.write(ref + b"\n") 

1260 except OSError: 

1261 f.abort() 

1262 raise 

1263 else: 

1264 self._log( 

1265 name, 

1266 None, 

1267 ref, 

1268 committer=committer, 

1269 timestamp=timestamp, 

1270 timezone=timezone, 

1271 message=message, 

1272 ) 

1273 return True 

1274 

1275 def remove_if_equals( 

1276 self, 

1277 name: Ref, 

1278 old_ref: ObjectID | None, 

1279 committer: bytes | None = None, 

1280 timestamp: int | None = None, 

1281 timezone: int | None = None, 

1282 message: bytes | None = None, 

1283 ) -> bool: 

1284 """Remove a refname only if it currently equals old_ref. 

1285 

1286 This method does not follow symbolic references. It can be used to 

1287 perform an atomic compare-and-delete operation. 

1288 

1289 Args: 

1290 name: The refname to delete. 

1291 old_ref: The old sha the refname must refer to, or None to 

1292 delete unconditionally. 

1293 committer: Optional committer name 

1294 timestamp: Optional timestamp 

1295 timezone: Optional timezone 

1296 message: Optional message 

1297 Returns: True if the delete was successful, False otherwise. 

1298 """ 

1299 self._check_refname(name) 

1300 filename = self.refpath(name) 

1301 ensure_dir_exists(os.path.dirname(filename)) 

1302 f = GitFile(filename, "wb") 

1303 try: 

1304 if old_ref is not None: 

1305 orig_ref = self.read_loose_ref(name) 

1306 if orig_ref is None: 

1307 orig_ref = self.get_packed_refs().get(name) 

1308 if orig_ref is None: 

1309 orig_ref = ZERO_SHA 

1310 if orig_ref != old_ref: 

1311 return False 

1312 

1313 # remove the reference file itself 

1314 try: 

1315 found = os.path.lexists(filename) 

1316 except OSError: 

1317 # may only be packed, or otherwise unstorable 

1318 found = False 

1319 

1320 if found: 

1321 os.remove(filename) 

1322 

1323 self._remove_packed_ref(name) 

1324 self._log( 

1325 name, 

1326 old_ref, 

1327 None, 

1328 committer=committer, 

1329 timestamp=timestamp, 

1330 timezone=timezone, 

1331 message=message, 

1332 ) 

1333 finally: 

1334 # never write, we just wanted the lock 

1335 f.abort() 

1336 

1337 # outside of the lock, clean-up any parent directory that might now 

1338 # be empty. this ensures that re-creating a reference of the same 

1339 # name of what was previously a directory works as expected 

1340 parent = name 

1341 while True: 

1342 try: 

1343 parent_bytes, _ = parent.rsplit(b"/", 1) 

1344 parent = Ref(parent_bytes) 

1345 except ValueError: 

1346 break 

1347 

1348 if parent == b"refs": 

1349 break 

1350 parent_filename = self.refpath(parent) 

1351 try: 

1352 os.rmdir(parent_filename) 

1353 except OSError: 

1354 # this can be caused by the parent directory being 

1355 # removed by another process, being not empty, etc. 

1356 # in any case, this is non fatal because we already 

1357 # removed the reference, just ignore it 

1358 break 

1359 

1360 return True 

1361 

1362 def pack_refs(self, all: bool = False) -> None: 

1363 """Pack loose refs into packed-refs file. 

1364 

1365 Args: 

1366 all: If True, pack all refs. If False, only pack tags. 

1367 """ 

1368 refs_to_pack: dict[Ref, ObjectID | None] = {} 

1369 for ref in self.allkeys(): 

1370 if ref == HEADREF: 

1371 # Never pack HEAD 

1372 continue 

1373 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1374 try: 

1375 sha = self[ref] 

1376 if sha: 

1377 refs_to_pack[ref] = sha 

1378 except KeyError: 

1379 # Broken ref, skip it 

1380 pass 

1381 

1382 if refs_to_pack: 

1383 self.add_packed_refs(refs_to_pack) 

1384 

1385 

1386def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]: 

1387 """Split a single ref line into a tuple of SHA1 and name.""" 

1388 fields = line.rstrip(b"\n\r").split(b" ") 

1389 if len(fields) != 2: 

1390 raise PackedRefsException(f"invalid ref line {line!r}") 

1391 sha, name = fields 

1392 if not valid_hexsha(sha): 

1393 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1394 if not check_ref_format(Ref(name)): 

1395 raise PackedRefsException(f"invalid ref name {name!r}") 

1396 return (ObjectID(sha), Ref(name)) 

1397 

1398 

1399def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]: 

1400 """Read a packed refs file. 

1401 

1402 Args: 

1403 f: file-like object to read from 

1404 Returns: Iterator over tuples with SHA1s and ref names. 

1405 """ 

1406 for line in f: 

1407 if line.startswith(b"#"): 

1408 # Comment 

1409 continue 

1410 if line.startswith(b"^"): 

1411 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1412 yield _split_ref_line(line) 

1413 

1414 

1415def read_packed_refs_with_peeled( 

1416 f: IO[bytes], 

1417) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]: 

1418 """Read a packed refs file including peeled refs. 

1419 

1420 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1421 with ref names, SHA1s, and peeled SHA1s (or None). 

1422 

1423 Args: 

1424 f: file-like object to read from, seek'ed to the second line 

1425 """ 

1426 last = None 

1427 for line in f: 

1428 if line.startswith(b"#"): 

1429 continue 

1430 line = line.rstrip(b"\r\n") 

1431 if line.startswith(b"^"): 

1432 if not last: 

1433 raise PackedRefsException("unexpected peeled ref line") 

1434 if not valid_hexsha(line[1:]): 

1435 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1436 sha, name = _split_ref_line(last) 

1437 last = None 

1438 yield (sha, name, ObjectID(line[1:])) 

1439 else: 

1440 if last: 

1441 sha, name = _split_ref_line(last) 

1442 yield (sha, name, None) 

1443 last = line 

1444 if last: 

1445 sha, name = _split_ref_line(last) 

1446 yield (sha, name, None) 

1447 

1448 

1449def write_packed_refs( 

1450 f: IO[bytes], 

1451 packed_refs: Mapping[Ref, ObjectID], 

1452 peeled_refs: Mapping[Ref, ObjectID] | None = None, 

1453) -> None: 

1454 """Write a packed refs file. 

1455 

1456 Args: 

1457 f: empty file-like object to write to 

1458 packed_refs: dict of refname to sha of packed refs to write 

1459 peeled_refs: dict of refname to peeled value of sha 

1460 """ 

1461 if peeled_refs is None: 

1462 peeled_refs = {} 

1463 else: 

1464 f.write(b"# pack-refs with: peeled\n") 

1465 for refname in sorted(packed_refs.keys()): 

1466 f.write(git_line(packed_refs[refname], refname)) 

1467 if refname in peeled_refs: 

1468 f.write(b"^" + peeled_refs[refname] + b"\n") 

1469 

1470 

1471def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]: 

1472 """Read info/refs file. 

1473 

1474 Args: 

1475 f: File-like object to read from 

1476 

1477 Returns: 

1478 Dictionary mapping ref names to SHA1s 

1479 """ 

1480 ret: dict[Ref, ObjectID] = {} 

1481 for line_no, line in enumerate(f.readlines(), 1): 

1482 stripped = line.rstrip(b"\r\n") 

1483 parts = stripped.split(b"\t", 1) 

1484 if len(parts) != 2: 

1485 raise ValueError( 

1486 f"Invalid info/refs format at line {line_no}: " 

1487 f"expected '<sha>\\t<refname>', got {stripped[:100]!r}" 

1488 ) 

1489 (sha, name) = parts 

1490 ret[Ref(name)] = ObjectID(sha) 

1491 return ret 

1492 

1493 

1494def is_local_branch(x: bytes) -> bool: 

1495 """Check if a ref name is a local branch.""" 

1496 return x.startswith(LOCAL_BRANCH_PREFIX) 

1497 

1498 

1499def _strip_leading_slash(name: bytes, kind: str) -> bytes: 

1500 """Strip a leading slash from a short ref name, warning if one is present. 

1501 

1502 A leading slash here means the caller stripped a ref prefix incorrectly 

1503 (e.g. used ``ref[len(b"refs/tags"):]`` instead of ``len(b"refs/tags/")``). 

1504 Joining such a name with a prefix would produce a malformed ref with an 

1505 empty path component. Warn rather than raise for now; this will become an 

1506 error in a future release. 

1507 """ 

1508 if not name.startswith(b"/"): 

1509 return name 

1510 warnings.warn( 

1511 f"{kind} name must not start with a slash: {name!r}; " 

1512 "this will be rejected in a future version of Dulwich.", 

1513 DeprecationWarning, 

1514 stacklevel=3, 

1515 ) 

1516 return name.lstrip(b"/") 

1517 

1518 

1519def local_branch_name(name: bytes) -> Ref: 

1520 """Build a full branch ref from a short name. 

1521 

1522 Args: 

1523 name: Short branch name (e.g., b"master") or full ref 

1524 

1525 Returns: 

1526 Full branch ref name (e.g., b"refs/heads/master") 

1527 

1528 Examples: 

1529 >>> local_branch_name(b"master") 

1530 b'refs/heads/master' 

1531 >>> local_branch_name(b"refs/heads/master") 

1532 b'refs/heads/master' 

1533 """ 

1534 if name.startswith(LOCAL_BRANCH_PREFIX): 

1535 return Ref(name) 

1536 return Ref(LOCAL_BRANCH_PREFIX + _strip_leading_slash(name, "Branch")) 

1537 

1538 

1539def local_tag_name(name: bytes) -> Ref: 

1540 """Build a full tag ref from a short name. 

1541 

1542 Args: 

1543 name: Short tag name (e.g., b"v1.0") or full ref 

1544 

1545 Returns: 

1546 Full tag ref name (e.g., b"refs/tags/v1.0") 

1547 

1548 Examples: 

1549 >>> local_tag_name(b"v1.0") 

1550 b'refs/tags/v1.0' 

1551 >>> local_tag_name(b"refs/tags/v1.0") 

1552 b'refs/tags/v1.0' 

1553 """ 

1554 if name.startswith(LOCAL_TAG_PREFIX): 

1555 return Ref(name) 

1556 return Ref(LOCAL_TAG_PREFIX + _strip_leading_slash(name, "Tag")) 

1557 

1558 

1559def local_replace_name(name: bytes) -> Ref: 

1560 """Build a full replace ref from a short name. 

1561 

1562 Args: 

1563 name: Short replace name (object SHA) or full ref 

1564 

1565 Returns: 

1566 Full replace ref name (e.g., b"refs/replace/<sha>") 

1567 

1568 Examples: 

1569 >>> local_replace_name(b"abc123") 

1570 b'refs/replace/abc123' 

1571 >>> local_replace_name(b"refs/replace/abc123") 

1572 b'refs/replace/abc123' 

1573 """ 

1574 if name.startswith(LOCAL_REPLACE_PREFIX): 

1575 return Ref(name) 

1576 return Ref(LOCAL_REPLACE_PREFIX + _strip_leading_slash(name, "Replace")) 

1577 

1578 

1579def extract_branch_name(ref: bytes) -> bytes: 

1580 """Extract branch name from a full branch ref. 

1581 

1582 Args: 

1583 ref: Full branch ref (e.g., b"refs/heads/master") 

1584 

1585 Returns: 

1586 Short branch name (e.g., b"master") 

1587 

1588 Raises: 

1589 ValueError: If ref is not a local branch 

1590 

1591 Examples: 

1592 >>> extract_branch_name(b"refs/heads/master") 

1593 b'master' 

1594 >>> extract_branch_name(b"refs/heads/feature/foo") 

1595 b'feature/foo' 

1596 """ 

1597 if not ref.startswith(LOCAL_BRANCH_PREFIX): 

1598 raise ValueError(f"Not a local branch ref: {ref!r}") 

1599 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1600 

1601 

1602def extract_tag_name(ref: bytes) -> bytes: 

1603 """Extract tag name from a full tag ref. 

1604 

1605 Args: 

1606 ref: Full tag ref (e.g., b"refs/tags/v1.0") 

1607 

1608 Returns: 

1609 Short tag name (e.g., b"v1.0") 

1610 

1611 Raises: 

1612 ValueError: If ref is not a local tag 

1613 

1614 Examples: 

1615 >>> extract_tag_name(b"refs/tags/v1.0") 

1616 b'v1.0' 

1617 """ 

1618 if not ref.startswith(LOCAL_TAG_PREFIX): 

1619 raise ValueError(f"Not a local tag ref: {ref!r}") 

1620 return ref[len(LOCAL_TAG_PREFIX) :] 

1621 

1622 

1623def shorten_ref_name(ref: bytes) -> bytes: 

1624 """Convert a full ref name to its short form. 

1625 

1626 Args: 

1627 ref: Full ref name (e.g., b"refs/heads/master") 

1628 

1629 Returns: 

1630 Short ref name (e.g., b"master") 

1631 

1632 Examples: 

1633 >>> shorten_ref_name(b"refs/heads/master") 

1634 b'master' 

1635 >>> shorten_ref_name(b"refs/remotes/origin/main") 

1636 b'origin/main' 

1637 >>> shorten_ref_name(b"refs/tags/v1.0") 

1638 b'v1.0' 

1639 >>> shorten_ref_name(b"HEAD") 

1640 b'HEAD' 

1641 """ 

1642 if ref.startswith(LOCAL_BRANCH_PREFIX): 

1643 return ref[len(LOCAL_BRANCH_PREFIX) :] 

1644 elif ref.startswith(LOCAL_REMOTE_PREFIX): 

1645 return ref[len(LOCAL_REMOTE_PREFIX) :] 

1646 elif ref.startswith(LOCAL_TAG_PREFIX): 

1647 return ref[len(LOCAL_TAG_PREFIX) :] 

1648 return ref 

1649 

1650 

1651def _set_origin_head( 

1652 refs: RefsContainer, origin: bytes, origin_head: bytes | None 

1653) -> None: 

1654 # set refs/remotes/origin/HEAD 

1655 origin_base = b"refs/remotes/" + origin + b"/" 

1656 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1657 origin_ref = Ref(origin_base + HEADREF) 

1658 target_ref = Ref(origin_base + extract_branch_name(origin_head)) 

1659 if target_ref in refs: 

1660 refs.set_symbolic_ref(origin_ref, target_ref) 

1661 

1662 

1663def _set_default_branch( 

1664 refs: RefsContainer, 

1665 origin: bytes, 

1666 origin_head: bytes | None, 

1667 branch: bytes | None, 

1668 ref_message: bytes | None, 

1669) -> bytes: 

1670 """Set the default branch.""" 

1671 origin_base = b"refs/remotes/" + origin + b"/" 

1672 if branch: 

1673 origin_ref = Ref(origin_base + branch) 

1674 if origin_ref in refs: 

1675 local_ref = Ref(local_branch_name(branch)) 

1676 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1677 head_ref = local_ref 

1678 elif Ref(local_tag_name(branch)) in refs: 

1679 head_ref = Ref(local_tag_name(branch)) 

1680 else: 

1681 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1682 elif origin_head: 

1683 head_ref = Ref(origin_head) 

1684 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1685 origin_ref = Ref(origin_base + extract_branch_name(origin_head)) 

1686 else: 

1687 origin_ref = Ref(origin_head) 

1688 try: 

1689 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1690 except KeyError: 

1691 pass 

1692 else: 

1693 raise ValueError("neither origin_head nor branch are provided") 

1694 return head_ref 

1695 

1696 

1697def _set_head( 

1698 refs: RefsContainer, head_ref: bytes, ref_message: bytes | None 

1699) -> ObjectID | None: 

1700 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1701 # detach HEAD at specified tag 

1702 head = refs[Ref(head_ref)] 

1703 if isinstance(head, Tag): 

1704 _cls, obj = head.object 

1705 head = obj.get_object(obj).id 

1706 del refs[HEADREF] 

1707 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1708 else: 

1709 # set HEAD to specific branch 

1710 try: 

1711 head = refs[Ref(head_ref)] 

1712 refs.set_symbolic_ref(HEADREF, Ref(head_ref)) 

1713 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1714 except KeyError: 

1715 head = None 

1716 return head 

1717 

1718 

1719def _import_remote_refs( 

1720 refs_container: RefsContainer, 

1721 remote_name: str, 

1722 refs: Mapping[Ref, ObjectID | None], 

1723 message: bytes | None = None, 

1724 prune: bool = False, 

1725 prune_tags: bool = False, 

1726) -> None: 

1727 from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs 

1728 

1729 stripped_refs = strip_peeled_refs(refs) 

1730 branches: dict[Ref, ObjectID | None] = { 

1731 Ref(extract_branch_name(n)): v 

1732 for (n, v) in stripped_refs.items() 

1733 if n.startswith(LOCAL_BRANCH_PREFIX) 

1734 } 

1735 refs_container.import_refs( 

1736 Ref(b"refs/remotes/" + remote_name.encode()), 

1737 branches, 

1738 message=message, 

1739 prune=prune, 

1740 ) 

1741 tags: dict[Ref, ObjectID | None] = { 

1742 Ref(extract_tag_name(n)): v 

1743 for (n, v) in stripped_refs.items() 

1744 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1745 } 

1746 refs_container.import_refs( 

1747 Ref(b"refs/tags"), tags, message=message, prune=prune_tags 

1748 ) 

1749 

1750 

1751class locked_ref: 

1752 """Lock a ref while making modifications. 

1753 

1754 Works as a context manager. 

1755 """ 

1756 

1757 def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None: 

1758 """Initialize a locked ref. 

1759 

1760 Args: 

1761 refs_container: The DiskRefsContainer to lock the ref in 

1762 refname: The ref name to lock 

1763 """ 

1764 self._refs_container = refs_container 

1765 self._refname = refname 

1766 self._file: _GitFile | None = None 

1767 self._realname: Ref | None = None 

1768 self._deleted = False 

1769 

1770 def __enter__(self) -> Self: 

1771 """Enter the context manager and acquire the lock. 

1772 

1773 Returns: 

1774 This locked_ref instance 

1775 

1776 Raises: 

1777 OSError: If the lock cannot be acquired 

1778 """ 

1779 self._refs_container._check_refname(self._refname) 

1780 try: 

1781 realnames, _ = self._refs_container.follow(self._refname) 

1782 self._realname = realnames[-1] 

1783 except (KeyError, IndexError, SymrefLoop): 

1784 self._realname = self._refname 

1785 

1786 filename = self._refs_container.refpath(self._realname) 

1787 ensure_dir_exists(os.path.dirname(filename)) 

1788 f = GitFile(filename, "wb") 

1789 self._file = f 

1790 return self 

1791 

1792 def __exit__( 

1793 self, 

1794 exc_type: type | None, 

1795 exc_value: BaseException | None, 

1796 traceback: types.TracebackType | None, 

1797 ) -> None: 

1798 """Exit the context manager and release the lock. 

1799 

1800 Args: 

1801 exc_type: Type of exception if one occurred 

1802 exc_value: Exception instance if one occurred 

1803 traceback: Traceback if an exception occurred 

1804 """ 

1805 if self._file: 

1806 if exc_type is not None or self._deleted: 

1807 self._file.abort() 

1808 else: 

1809 self._file.close() 

1810 

1811 def get(self) -> bytes | None: 

1812 """Get the current value of the ref.""" 

1813 if not self._file: 

1814 raise RuntimeError("locked_ref not in context") 

1815 

1816 assert self._realname is not None 

1817 current_ref = self._refs_container.read_loose_ref(self._realname) 

1818 if current_ref is None: 

1819 current_ref = self._refs_container.get_packed_refs().get( 

1820 self._realname, None 

1821 ) 

1822 return current_ref 

1823 

1824 def ensure_equals(self, expected_value: bytes | None) -> bool: 

1825 """Ensure the ref currently equals the expected value. 

1826 

1827 Args: 

1828 expected_value: The expected current value of the ref 

1829 Returns: 

1830 True if the ref equals the expected value, False otherwise 

1831 """ 

1832 current_value = self.get() 

1833 return current_value == expected_value 

1834 

1835 def set(self, new_ref: bytes) -> None: 

1836 """Set the ref to a new value. 

1837 

1838 Args: 

1839 new_ref: The new SHA1 or symbolic ref value 

1840 """ 

1841 if not self._file: 

1842 raise RuntimeError("locked_ref not in context") 

1843 

1844 if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)): 

1845 raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref") 

1846 

1847 self._file.seek(0) 

1848 self._file.truncate() 

1849 self._file.write(new_ref + b"\n") 

1850 self._deleted = False 

1851 

1852 def set_symbolic_ref(self, target: Ref) -> None: 

1853 """Make this ref point at another ref. 

1854 

1855 Args: 

1856 target: Name of the ref to point at 

1857 """ 

1858 if not self._file: 

1859 raise RuntimeError("locked_ref not in context") 

1860 

1861 self._refs_container._check_refname(target) 

1862 self._file.seek(0) 

1863 self._file.truncate() 

1864 self._file.write(SYMREF + target + b"\n") 

1865 self._deleted = False 

1866 

1867 def delete(self) -> None: 

1868 """Delete the ref file while holding the lock.""" 

1869 if not self._file: 

1870 raise RuntimeError("locked_ref not in context") 

1871 

1872 # Delete the actual ref file while holding the lock 

1873 if self._realname: 

1874 filename = self._refs_container.refpath(self._realname) 

1875 try: 

1876 if os.path.lexists(filename): 

1877 os.remove(filename) 

1878 except FileNotFoundError: 

1879 pass 

1880 self._refs_container._remove_packed_ref(self._realname) 

1881 

1882 self._deleted = True 

1883 

1884 

1885class NamespacedRefsContainer(RefsContainer): 

1886 """Wrapper that adds namespace prefix to all ref operations. 

1887 

1888 This implements Git's GIT_NAMESPACE feature, which stores refs under 

1889 refs/namespaces/<namespace>/ and filters operations to only show refs 

1890 within that namespace. 

1891 

1892 Example: 

1893 With namespace "foo", a ref "refs/heads/master" is stored as 

1894 "refs/namespaces/foo/refs/heads/master" in the underlying container. 

1895 """ 

1896 

1897 def __init__(self, refs: RefsContainer, namespace: bytes) -> None: 

1898 """Initialize NamespacedRefsContainer. 

1899 

1900 Args: 

1901 refs: The underlying refs container to wrap 

1902 namespace: The namespace prefix (e.g., b"foo" or b"foo/bar") 

1903 """ 

1904 super().__init__(logger=refs._logger) 

1905 self._refs = refs 

1906 # Build namespace prefix: refs/namespaces/<namespace>/ 

1907 # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/ 

1908 namespace_parts = namespace.split(b"/") 

1909 self._namespace_prefix = b"" 

1910 for part in namespace_parts: 

1911 self._namespace_prefix += b"refs/namespaces/" + part + b"/" 

1912 

1913 def _apply_namespace(self, name: bytes) -> bytes: 

1914 """Apply namespace prefix to a ref name.""" 

1915 # HEAD and other special refs are not namespaced 

1916 if name == HEADREF or not name.startswith(b"refs/"): 

1917 return name 

1918 return self._namespace_prefix + name 

1919 

1920 def _strip_namespace(self, name: bytes) -> bytes | None: 

1921 """Remove namespace prefix from a ref name. 

1922 

1923 Returns None if the ref is not in our namespace. 

1924 """ 

1925 # HEAD and other special refs are not namespaced 

1926 if name == HEADREF or not name.startswith(b"refs/"): 

1927 return name 

1928 if name.startswith(self._namespace_prefix): 

1929 return name[len(self._namespace_prefix) :] 

1930 return None 

1931 

1932 def allkeys(self) -> set[Ref]: 

1933 """Return all reference keys in this namespace.""" 

1934 keys: set[Ref] = set() 

1935 for key in self._refs.allkeys(): 

1936 stripped = self._strip_namespace(key) 

1937 if stripped is not None: 

1938 keys.add(Ref(stripped)) 

1939 return keys 

1940 

1941 def read_loose_ref(self, name: Ref) -> bytes | None: 

1942 """Read a loose reference.""" 

1943 return self._refs.read_loose_ref(Ref(self._apply_namespace(name))) 

1944 

1945 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

1946 """Get packed refs within this namespace.""" 

1947 packed: dict[Ref, ObjectID] = {} 

1948 for name, value in self._refs.get_packed_refs().items(): 

1949 stripped = self._strip_namespace(name) 

1950 if stripped is not None: 

1951 packed[Ref(stripped)] = value 

1952 return packed 

1953 

1954 def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None: 

1955 """Add packed refs with namespace prefix.""" 

1956 namespaced_refs: dict[Ref, ObjectID | None] = { 

1957 Ref(self._apply_namespace(name)): value for name, value in new_refs.items() 

1958 } 

1959 self._refs.add_packed_refs(namespaced_refs) 

1960 

1961 def get_peeled(self, name: Ref) -> ObjectID | None: 

1962 """Return the cached peeled value of a ref.""" 

1963 return self._refs.get_peeled(Ref(self._apply_namespace(name))) 

1964 

1965 def set_symbolic_ref( 

1966 self, 

1967 name: Ref, 

1968 other: Ref, 

1969 committer: bytes | None = None, 

1970 timestamp: int | None = None, 

1971 timezone: int | None = None, 

1972 message: bytes | None = None, 

1973 ) -> None: 

1974 """Make a ref point at another ref.""" 

1975 self._refs.set_symbolic_ref( 

1976 Ref(self._apply_namespace(name)), 

1977 Ref(self._apply_namespace(other)), 

1978 committer=committer, 

1979 timestamp=timestamp, 

1980 timezone=timezone, 

1981 message=message, 

1982 ) 

1983 

1984 def set_if_equals( 

1985 self, 

1986 name: Ref, 

1987 old_ref: ObjectID | None, 

1988 new_ref: ObjectID, 

1989 committer: bytes | None = None, 

1990 timestamp: int | None = None, 

1991 timezone: int | None = None, 

1992 message: bytes | None = None, 

1993 ) -> bool: 

1994 """Set a refname to new_ref only if it currently equals old_ref.""" 

1995 return self._refs.set_if_equals( 

1996 Ref(self._apply_namespace(name)), 

1997 old_ref, 

1998 new_ref, 

1999 committer=committer, 

2000 timestamp=timestamp, 

2001 timezone=timezone, 

2002 message=message, 

2003 ) 

2004 

2005 def add_if_new( 

2006 self, 

2007 name: Ref, 

2008 ref: ObjectID, 

2009 committer: bytes | None = None, 

2010 timestamp: int | None = None, 

2011 timezone: int | None = None, 

2012 message: bytes | None = None, 

2013 ) -> bool: 

2014 """Add a new reference only if it does not already exist.""" 

2015 return self._refs.add_if_new( 

2016 Ref(self._apply_namespace(name)), 

2017 ref, 

2018 committer=committer, 

2019 timestamp=timestamp, 

2020 timezone=timezone, 

2021 message=message, 

2022 ) 

2023 

2024 def remove_if_equals( 

2025 self, 

2026 name: Ref, 

2027 old_ref: ObjectID | None, 

2028 committer: bytes | None = None, 

2029 timestamp: int | None = None, 

2030 timezone: int | None = None, 

2031 message: bytes | None = None, 

2032 ) -> bool: 

2033 """Remove a refname only if it currently equals old_ref.""" 

2034 return self._refs.remove_if_equals( 

2035 Ref(self._apply_namespace(name)), 

2036 old_ref, 

2037 committer=committer, 

2038 timestamp=timestamp, 

2039 timezone=timezone, 

2040 message=message, 

2041 ) 

2042 

2043 def pack_refs(self, all: bool = False) -> None: 

2044 """Pack loose refs into packed-refs file. 

2045 

2046 Note: This packs all refs in the underlying container, not just 

2047 those in the namespace. 

2048 """ 

2049 self._refs.pack_refs(all=all) 

2050 

2051 

2052def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T: 

2053 """Filter refs to only include those with a given prefix. 

2054 

2055 Args: 

2056 refs: A dictionary of refs. 

2057 prefixes: The prefixes to filter by. 

2058 """ 

2059 filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)} 

2060 return filtered 

2061 

2062 

2063def is_per_worktree_ref(ref: bytes) -> bool: 

2064 """Returns whether a reference is stored per worktree or not. 

2065 

2066 Per-worktree references are: 

2067 - all pseudorefs, e.g. HEAD 

2068 - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/" 

2069 

2070 All refs starting with "refs/" are shared, except for the ones listed above. 

2071 

2072 See https://git-scm.com/docs/git-worktree#_refs. 

2073 """ 

2074 return not ref.startswith(b"refs/") or ref.startswith( 

2075 (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/") 

2076 )