Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

631 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26import builtins 

27import os 

28import shutil 

29import stat 

30import sys 

31import tempfile 

32import time 

33import warnings 

34from collections.abc import Iterable, Iterator, Sequence 

35from contextlib import contextmanager 

36from pathlib import Path 

37from typing import Any, Callable, Union 

38 

39from .errors import CommitError, HookError 

40from .objects import Blob, Commit, ObjectID, Tag, Tree 

41from .refs import SYMREF, Ref, local_branch_name 

42from .repo import ( 

43 GITDIR, 

44 WORKTREES, 

45 Repo, 

46 check_user_identity, 

47 get_user_identity, 

48) 

49from .trailers import add_trailer_to_message 

50 

51 

52class WorkTreeInfo: 

53 """Information about a single worktree. 

54 

55 Attributes: 

56 path: Path to the worktree 

57 head: Current HEAD commit SHA 

58 branch: Current branch (if not detached) 

59 bare: Whether this is a bare repository 

60 detached: Whether HEAD is detached 

61 locked: Whether the worktree is locked 

62 prunable: Whether the worktree can be pruned 

63 lock_reason: Reason for locking (if locked) 

64 """ 

65 

66 def __init__( 

67 self, 

68 path: str, 

69 head: bytes | None = None, 

70 branch: bytes | None = None, 

71 bare: bool = False, 

72 detached: bool = False, 

73 locked: bool = False, 

74 prunable: bool = False, 

75 lock_reason: str | None = None, 

76 ): 

77 """Initialize WorkTreeInfo. 

78 

79 Args: 

80 path: Path to the worktree 

81 head: Current HEAD commit SHA 

82 branch: Current branch (if not detached) 

83 bare: Whether this is a bare repository 

84 detached: Whether HEAD is detached 

85 locked: Whether the worktree is locked 

86 prunable: Whether the worktree can be pruned 

87 lock_reason: Reason for locking (if locked) 

88 """ 

89 self.path = path 

90 self.head = head 

91 self.branch = branch 

92 self.bare = bare 

93 self.detached = detached 

94 self.locked = locked 

95 self.prunable = prunable 

96 self.lock_reason = lock_reason 

97 

98 def __repr__(self) -> str: 

99 """Return string representation of WorkTreeInfo.""" 

100 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

101 

102 def __eq__(self, other: object) -> bool: 

103 """Check equality with another WorkTreeInfo.""" 

104 if not isinstance(other, WorkTreeInfo): 

105 return NotImplemented 

106 return ( 

107 self.path == other.path 

108 and self.head == other.head 

109 and self.branch == other.branch 

110 and self.bare == other.bare 

111 and self.detached == other.detached 

112 and self.locked == other.locked 

113 and self.prunable == other.prunable 

114 and self.lock_reason == other.lock_reason 

115 ) 

116 

117 def open(self) -> WorkTree: 

118 """Open this worktree as a WorkTree. 

119 

120 Returns: 

121 WorkTree object for this worktree 

122 

123 Raises: 

124 NotGitRepository: If the worktree path is invalid 

125 """ 

126 from .repo import Repo 

127 

128 repo = Repo(self.path) 

129 return WorkTree(repo, self.path) 

130 

131 

132class WorkTreeContainer: 

133 """Container for managing multiple working trees. 

134 

135 This class manages worktrees for a repository, similar to how 

136 RefsContainer manages references. 

137 """ 

138 

139 def __init__(self, repo: Repo) -> None: 

140 """Initialize a WorkTreeContainer for the given repository. 

141 

142 Args: 

143 repo: The repository this container belongs to 

144 """ 

145 self._repo = repo 

146 

147 def list(self) -> list[WorkTreeInfo]: 

148 """List all worktrees for this repository. 

149 

150 Returns: 

151 A list of WorkTreeInfo objects 

152 """ 

153 return list_worktrees(self._repo) 

154 

155 def add( 

156 self, 

157 path: str | bytes | os.PathLike[str], 

158 branch: str | bytes | None = None, 

159 commit: ObjectID | None = None, 

160 force: bool = False, 

161 detach: bool = False, 

162 exist_ok: bool = False, 

163 ) -> Repo: 

164 """Add a new worktree. 

165 

166 Args: 

167 path: Path where the new worktree should be created 

168 branch: Branch to checkout in the new worktree 

169 commit: Specific commit to checkout (results in detached HEAD) 

170 force: Force creation even if branch is already checked out elsewhere 

171 detach: Detach HEAD in the new worktree 

172 exist_ok: If True, do not raise an error if the directory already exists 

173 

174 Returns: 

175 The newly created worktree repository 

176 """ 

177 return add_worktree( 

178 self._repo, 

179 path, 

180 branch=branch, 

181 commit=commit, 

182 force=force, 

183 detach=detach, 

184 exist_ok=exist_ok, 

185 ) 

186 

187 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None: 

188 """Remove a worktree. 

189 

190 Args: 

191 path: Path to the worktree to remove 

192 force: Force removal even if there are local changes 

193 """ 

194 remove_worktree(self._repo, path, force=force) 

195 

196 def prune( 

197 self, expire: int | None = None, dry_run: bool = False 

198 ) -> builtins.list[str]: 

199 """Prune worktree administrative files for missing worktrees. 

200 

201 Args: 

202 expire: Only prune worktrees older than this many seconds 

203 dry_run: Don't actually remove anything, just report what would be removed 

204 

205 Returns: 

206 List of pruned worktree identifiers 

207 """ 

208 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

209 

210 def move( 

211 self, 

212 old_path: str | bytes | os.PathLike[str], 

213 new_path: str | bytes | os.PathLike[str], 

214 ) -> None: 

215 """Move a worktree to a new location. 

216 

217 Args: 

218 old_path: Current path of the worktree 

219 new_path: New path for the worktree 

220 """ 

221 move_worktree(self._repo, old_path, new_path) 

222 

223 def lock( 

224 self, path: str | bytes | os.PathLike[str], reason: str | None = None 

225 ) -> None: 

226 """Lock a worktree to prevent it from being pruned. 

227 

228 Args: 

229 path: Path to the worktree to lock 

230 reason: Optional reason for locking 

231 """ 

232 lock_worktree(self._repo, path, reason=reason) 

233 

234 def unlock(self, path: str | bytes | os.PathLike[str]) -> None: 

235 """Unlock a worktree. 

236 

237 Args: 

238 path: Path to the worktree to unlock 

239 """ 

240 unlock_worktree(self._repo, path) 

241 

242 def repair( 

243 self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None 

244 ) -> builtins.list[str]: 

245 """Repair worktree administrative files. 

246 

247 Args: 

248 paths: Optional list of worktree paths to repair. If None, repairs 

249 connections from the main repository to all linked worktrees. 

250 

251 Returns: 

252 List of repaired worktree paths 

253 """ 

254 return repair_worktree(self._repo, paths=paths) 

255 

256 def __iter__(self) -> Iterator[WorkTreeInfo]: 

257 """Iterate over all worktrees.""" 

258 yield from self.list() 

259 

260 

261class WorkTree: 

262 """Working tree operations for a Git repository. 

263 

264 This class provides methods for working with the working tree, 

265 such as staging files, committing changes, and resetting the index. 

266 """ 

267 

268 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

269 """Initialize a WorkTree for the given repository. 

270 

271 Args: 

272 repo: The repository this working tree belongs to 

273 path: Path to the working tree directory 

274 """ 

275 self._repo = repo 

276 raw_path = os.fspath(path) 

277 if isinstance(raw_path, bytes): 

278 self.path: str = os.fsdecode(raw_path) 

279 else: 

280 self.path = raw_path 

281 self.path = os.path.abspath(self.path) 

282 

283 def stage( 

284 self, 

285 fs_paths: str 

286 | bytes 

287 | os.PathLike[str] 

288 | Iterable[str | bytes | os.PathLike[str]], 

289 ) -> None: 

290 """Stage a set of paths. 

291 

292 Args: 

293 fs_paths: List of paths, relative to the repository path 

294 """ 

295 root_path_bytes = os.fsencode(self.path) 

296 

297 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

298 fs_paths = [fs_paths] 

299 fs_paths = list(fs_paths) 

300 

301 from .index import ( 

302 _fs_to_tree_path, 

303 blob_from_path_and_stat, 

304 index_entry_from_directory, 

305 index_entry_from_stat, 

306 ) 

307 

308 index = self._repo.open_index() 

309 blob_normalizer = self._repo.get_blob_normalizer() 

310 for fs_path in fs_paths: 

311 if not isinstance(fs_path, bytes): 

312 fs_path = os.fsencode(fs_path) 

313 if os.path.isabs(fs_path): 

314 raise ValueError( 

315 f"path {fs_path!r} should be relative to " 

316 "repository root, not absolute" 

317 ) 

318 tree_path = _fs_to_tree_path(fs_path) 

319 full_path = os.path.join(root_path_bytes, fs_path) 

320 try: 

321 st = os.lstat(full_path) 

322 except (FileNotFoundError, NotADirectoryError): 

323 # File no longer exists 

324 try: 

325 del index[tree_path] 

326 except KeyError: 

327 pass # already removed 

328 else: 

329 if stat.S_ISDIR(st.st_mode): 

330 entry = index_entry_from_directory(st, full_path) 

331 if entry: 

332 index[tree_path] = entry 

333 else: 

334 try: 

335 del index[tree_path] 

336 except KeyError: 

337 pass 

338 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

339 try: 

340 del index[tree_path] 

341 except KeyError: 

342 pass 

343 else: 

344 blob = blob_from_path_and_stat(full_path, st) 

345 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

346 self._repo.object_store.add_object(blob) 

347 index[tree_path] = index_entry_from_stat(st, blob.id) 

348 index.write() 

349 

350 def unstage(self, fs_paths: Sequence[str]) -> None: 

351 """Unstage specific file in the index. 

352 

353 Args: 

354 fs_paths: a list of files to unstage, 

355 relative to the repository path. 

356 """ 

357 from .index import IndexEntry, _fs_to_tree_path 

358 

359 index = self._repo.open_index() 

360 try: 

361 commit = self._repo[b"HEAD"] 

362 except KeyError: 

363 # no head mean no commit in the repo 

364 for fs_path in fs_paths: 

365 tree_path = _fs_to_tree_path(fs_path) 

366 del index[tree_path] 

367 index.write() 

368 return 

369 else: 

370 assert isinstance(commit, Commit), "HEAD must be a commit" 

371 tree_id = commit.tree 

372 

373 for fs_path in fs_paths: 

374 tree_path = _fs_to_tree_path(fs_path) 

375 try: 

376 tree = self._repo.object_store[tree_id] 

377 assert isinstance(tree, Tree) 

378 tree_entry = tree.lookup_path( 

379 self._repo.object_store.__getitem__, tree_path 

380 ) 

381 except KeyError: 

382 # if tree_entry didn't exist, this file was being added, so 

383 # remove index entry 

384 try: 

385 del index[tree_path] 

386 continue 

387 except KeyError as exc: 

388 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

389 

390 st = None 

391 try: 

392 st = os.lstat(os.path.join(self.path, fs_path)) 

393 except FileNotFoundError: 

394 pass 

395 

396 blob_obj = self._repo[tree_entry[1]] 

397 assert isinstance(blob_obj, Blob) 

398 blob_size = len(blob_obj.data) 

399 

400 index_entry = IndexEntry( 

401 ctime=(commit.commit_time, 0), 

402 mtime=(commit.commit_time, 0), 

403 dev=st.st_dev if st else 0, 

404 ino=st.st_ino if st else 0, 

405 mode=tree_entry[0], 

406 uid=st.st_uid if st else 0, 

407 gid=st.st_gid if st else 0, 

408 size=blob_size, 

409 sha=tree_entry[1], 

410 flags=0, 

411 extended_flags=0, 

412 ) 

413 

414 index[tree_path] = index_entry 

415 index.write() 

416 

417 def commit( 

418 self, 

419 message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None, 

420 committer: bytes | None = None, 

421 author: bytes | None = None, 

422 commit_timestamp: float | None = None, 

423 commit_timezone: int | None = None, 

424 author_timestamp: float | None = None, 

425 author_timezone: int | None = None, 

426 tree: ObjectID | None = None, 

427 encoding: bytes | None = None, 

428 ref: Ref | None = b"HEAD", 

429 merge_heads: Sequence[ObjectID] | None = None, 

430 no_verify: bool = False, 

431 sign: bool | None = None, 

432 signoff: bool | None = None, 

433 ) -> ObjectID: 

434 """Create a new commit. 

435 

436 If not specified, committer and author default to 

437 get_user_identity(..., 'COMMITTER') 

438 and get_user_identity(..., 'AUTHOR') respectively. 

439 

440 Args: 

441 message: Commit message (bytes or callable that takes (repo, commit) 

442 and returns bytes) 

443 committer: Committer fullname 

444 author: Author fullname 

445 commit_timestamp: Commit timestamp (defaults to now) 

446 commit_timezone: Commit timestamp timezone (defaults to GMT) 

447 author_timestamp: Author timestamp (defaults to commit 

448 timestamp) 

449 author_timezone: Author timestamp timezone 

450 (defaults to commit timestamp timezone) 

451 tree: SHA1 of the tree root to use (if not specified the 

452 current index will be committed). 

453 encoding: Encoding 

454 ref: Optional ref to commit to (defaults to current branch). 

455 If None, creates a dangling commit without updating any ref. 

456 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

457 no_verify: Skip pre-commit and commit-msg hooks 

458 sign: GPG Sign the commit (bool, defaults to False, 

459 pass True to use default GPG key, 

460 pass a str containing Key ID to use a specific GPG key) 

461 signoff: Add Signed-off-by line (DCO) to commit message. 

462 If None, uses format.signoff config. 

463 

464 Returns: 

465 New commit SHA1 

466 """ 

467 try: 

468 if not no_verify: 

469 self._repo.hooks["pre-commit"].execute() 

470 except HookError as exc: 

471 raise CommitError(exc) from exc 

472 except KeyError: # no hook defined, silent fallthrough 

473 pass 

474 

475 c = Commit() 

476 if tree is None: 

477 index = self._repo.open_index() 

478 c.tree = index.commit(self._repo.object_store) 

479 else: 

480 if len(tree) != 40: 

481 raise ValueError("tree must be a 40-byte hex sha string") 

482 c.tree = tree 

483 

484 config = self._repo.get_config_stack() 

485 if merge_heads is None: 

486 merge_heads = self._repo._read_heads("MERGE_HEAD") 

487 if committer is None: 

488 committer = get_user_identity(config, kind="COMMITTER") 

489 check_user_identity(committer) 

490 c.committer = committer 

491 if commit_timestamp is None: 

492 # FIXME: Support GIT_COMMITTER_DATE environment variable 

493 commit_timestamp = time.time() 

494 c.commit_time = int(commit_timestamp) 

495 if commit_timezone is None: 

496 # FIXME: Use current user timezone rather than UTC 

497 commit_timezone = 0 

498 c.commit_timezone = commit_timezone 

499 if author is None: 

500 author = get_user_identity(config, kind="AUTHOR") 

501 c.author = author 

502 check_user_identity(author) 

503 if author_timestamp is None: 

504 # FIXME: Support GIT_AUTHOR_DATE environment variable 

505 author_timestamp = commit_timestamp 

506 c.author_time = int(author_timestamp) 

507 if author_timezone is None: 

508 author_timezone = commit_timezone 

509 c.author_timezone = author_timezone 

510 if encoding is None: 

511 try: 

512 encoding = config.get(("i18n",), "commitEncoding") 

513 except KeyError: 

514 pass # No dice 

515 if encoding is not None: 

516 c.encoding = encoding 

517 # Store original message (might be callable) 

518 original_message = message 

519 message = None # Will be set later after parents are set 

520 

521 # Check if we should sign the commit 

522 if sign is None: 

523 # Check commit.gpgSign configuration when sign is not explicitly set 

524 try: 

525 should_sign = config.get_boolean( 

526 (b"commit",), b"gpgsign", default=False 

527 ) 

528 except KeyError: 

529 should_sign = False # Default to not signing if no config 

530 else: 

531 should_sign = sign 

532 

533 # Get the signing key from config if signing is enabled 

534 keyid = None 

535 if should_sign: 

536 try: 

537 keyid_bytes = config.get((b"user",), b"signingkey") 

538 keyid = keyid_bytes.decode() if keyid_bytes else None 

539 except KeyError: 

540 keyid = None 

541 

542 if ref is None: 

543 # Create a dangling commit 

544 c.parents = merge_heads 

545 else: 

546 try: 

547 old_head = self._repo.refs[ref] 

548 c.parents = [old_head, *merge_heads] 

549 except KeyError: 

550 c.parents = merge_heads 

551 

552 # Handle message after parents are set 

553 if callable(original_message): 

554 message = original_message(self._repo, c) 

555 if message is None: 

556 raise ValueError("Message callback returned None") 

557 else: 

558 message = original_message 

559 

560 if message is None: 

561 # FIXME: Try to read commit message from .git/MERGE_MSG 

562 raise ValueError("No commit message specified") 

563 

564 # Handle signoff 

565 should_signoff = signoff 

566 if should_signoff is None: 

567 # Check format.signOff configuration 

568 try: 

569 should_signoff = config.get_boolean( 

570 (b"format",), b"signoff", default=False 

571 ) 

572 except KeyError: 

573 should_signoff = False 

574 

575 if should_signoff: 

576 # Add Signed-off-by trailer 

577 # Get the committer identity for the signoff 

578 signoff_identity = committer 

579 if isinstance(message, bytes): 

580 message_bytes = message 

581 else: 

582 message_bytes = message.encode("utf-8") 

583 

584 message_bytes = add_trailer_to_message( 

585 message_bytes, 

586 "Signed-off-by", 

587 signoff_identity.decode("utf-8") 

588 if isinstance(signoff_identity, bytes) 

589 else signoff_identity, 

590 separator=":", 

591 where="end", 

592 if_exists="addIfDifferentNeighbor", 

593 if_missing="add", 

594 ) 

595 message = message_bytes 

596 

597 try: 

598 if no_verify: 

599 c.message = message 

600 else: 

601 c.message = self._repo.hooks["commit-msg"].execute(message) 

602 if c.message is None: 

603 c.message = message 

604 except HookError as exc: 

605 raise CommitError(exc) from exc 

606 except KeyError: # no hook defined, message not modified 

607 c.message = message 

608 

609 if ref is None: 

610 # Create a dangling commit 

611 if should_sign: 

612 c.sign(keyid) 

613 self._repo.object_store.add_object(c) 

614 else: 

615 try: 

616 old_head = self._repo.refs[ref] 

617 if should_sign: 

618 c.sign(keyid) 

619 self._repo.object_store.add_object(c) 

620 message_bytes = ( 

621 message.encode() if isinstance(message, str) else message 

622 ) 

623 ok = self._repo.refs.set_if_equals( 

624 ref, 

625 old_head, 

626 c.id, 

627 message=b"commit: " + message_bytes, 

628 committer=committer, 

629 timestamp=int(commit_timestamp) 

630 if commit_timestamp is not None 

631 else None, 

632 timezone=commit_timezone, 

633 ) 

634 except KeyError: 

635 c.parents = merge_heads 

636 if should_sign: 

637 c.sign(keyid) 

638 self._repo.object_store.add_object(c) 

639 message_bytes = ( 

640 message.encode() if isinstance(message, str) else message 

641 ) 

642 ok = self._repo.refs.add_if_new( 

643 ref, 

644 c.id, 

645 message=b"commit: " + message_bytes, 

646 committer=committer, 

647 timestamp=int(commit_timestamp) 

648 if commit_timestamp is not None 

649 else None, 

650 timezone=commit_timezone, 

651 ) 

652 if not ok: 

653 # Fail if the atomic compare-and-swap failed, leaving the 

654 # commit and all its objects as garbage. 

655 raise CommitError(f"{ref!r} changed during commit") 

656 

657 self._repo._del_named_file("MERGE_HEAD") 

658 

659 try: 

660 self._repo.hooks["post-commit"].execute() 

661 except HookError as e: # silent failure 

662 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

663 except KeyError: # no hook defined, silent fallthrough 

664 pass 

665 

666 # Trigger auto GC if needed 

667 from .gc import maybe_auto_gc 

668 

669 maybe_auto_gc(self._repo) 

670 

671 return c.id 

672 

673 def reset_index(self, tree: bytes | None = None) -> None: 

674 """Reset the index back to a specific tree. 

675 

676 Args: 

677 tree: Tree SHA to reset to, None for current HEAD tree. 

678 """ 

679 from .index import ( 

680 build_index_from_tree, 

681 symlink, 

682 validate_path_element_default, 

683 validate_path_element_hfs, 

684 validate_path_element_ntfs, 

685 ) 

686 

687 if tree is None: 

688 head = self._repo[b"HEAD"] 

689 if isinstance(head, Tag): 

690 _cls, obj = head.object 

691 head = self._repo.get_object(obj) 

692 from .objects import Commit 

693 

694 assert isinstance(head, Commit) 

695 tree = head.tree 

696 config = self._repo.get_config() 

697 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

698 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

699 validate_path_element = validate_path_element_ntfs 

700 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

701 validate_path_element = validate_path_element_hfs 

702 else: 

703 validate_path_element = validate_path_element_default 

704 if config.get_boolean(b"core", b"symlinks", True): 

705 symlink_fn = symlink 

706 else: 

707 

708 def symlink_fn( # type: ignore[misc,unused-ignore] 

709 src: Union[str, bytes], 

710 dst: Union[str, bytes], 

711 target_is_directory: bool = False, 

712 *, 

713 dir_fd: int | None = None, 

714 ) -> None: 

715 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f: 

716 f.write(src) 

717 

718 blob_normalizer = self._repo.get_blob_normalizer() 

719 return build_index_from_tree( 

720 self.path, 

721 self._repo.index_path(), 

722 self._repo.object_store, 

723 tree, 

724 honor_filemode=honor_filemode, 

725 validate_path_element=validate_path_element, 

726 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore] 

727 blob_normalizer=blob_normalizer, 

728 ) 

729 

730 def _sparse_checkout_file_path(self) -> str: 

731 """Return the path of the sparse-checkout file in this repo's control dir.""" 

732 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

733 

734 def configure_for_cone_mode(self) -> None: 

735 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

736 config = self._repo.get_config() 

737 config.set((b"core",), b"sparseCheckout", b"true") 

738 config.set((b"core",), b"sparseCheckoutCone", b"true") 

739 config.write_to_path() 

740 

741 def infer_cone_mode(self) -> bool: 

742 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

743 config = self._repo.get_config() 

744 try: 

745 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

746 return sc_cone == b"true" 

747 except KeyError: 

748 # If core.sparseCheckoutCone is not set, default to False 

749 return False 

750 

751 def get_sparse_checkout_patterns(self) -> list[str]: 

752 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

753 

754 Returns: 

755 A list of patterns. Returns an empty list if the file is missing. 

756 """ 

757 path = self._sparse_checkout_file_path() 

758 try: 

759 with open(path, encoding="utf-8") as f: 

760 return [line.strip() for line in f if line.strip()] 

761 except FileNotFoundError: 

762 return [] 

763 

764 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

765 """Write the given sparse-checkout patterns into info/sparse-checkout. 

766 

767 Creates the info/ directory if it does not exist. 

768 

769 Args: 

770 patterns: A list of gitignore-style patterns to store. 

771 """ 

772 info_dir = os.path.join(self._repo.controldir(), "info") 

773 os.makedirs(info_dir, exist_ok=True) 

774 

775 path = self._sparse_checkout_file_path() 

776 with open(path, "w", encoding="utf-8") as f: 

777 for pat in patterns: 

778 f.write(pat + "\n") 

779 

780 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

781 """Write the given cone-mode directory patterns into info/sparse-checkout. 

782 

783 For each directory to include, add an inclusion line that "undoes" the prior 

784 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

785 Never add the same line twice. 

786 """ 

787 patterns = ["/*", "!/*/"] 

788 if dirs: 

789 for d in dirs: 

790 d = d.strip("/") 

791 line = f"/{d}/" 

792 if d and line not in patterns: 

793 patterns.append(line) 

794 self.set_sparse_checkout_patterns(patterns) 

795 

796 

797def read_worktree_lock_reason(worktree_path: str) -> str | None: 

798 """Read the lock reason for a worktree. 

799 

800 Args: 

801 worktree_path: Path to the worktree's administrative directory 

802 

803 Returns: 

804 The lock reason if the worktree is locked, None otherwise 

805 """ 

806 locked_path = os.path.join(worktree_path, "locked") 

807 if not os.path.exists(locked_path): 

808 return None 

809 

810 try: 

811 with open(locked_path) as f: 

812 return f.read().strip() 

813 except (FileNotFoundError, PermissionError): 

814 return None 

815 

816 

817def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

818 """List all worktrees for the given repository. 

819 

820 Args: 

821 repo: The repository to list worktrees for 

822 

823 Returns: 

824 A list of WorkTreeInfo objects 

825 """ 

826 worktrees = [] 

827 

828 # Add main worktree 

829 main_wt_info = WorkTreeInfo( 

830 path=repo.path, 

831 head=repo.head(), 

832 bare=repo.bare, 

833 detached=False, 

834 locked=False, 

835 prunable=False, 

836 ) 

837 

838 # Get branch info for main worktree 

839 try: 

840 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

841 head_contents = f.read().strip() 

842 if head_contents.startswith(SYMREF): 

843 ref_name = head_contents[len(SYMREF) :].strip() 

844 main_wt_info.branch = ref_name 

845 else: 

846 main_wt_info.detached = True 

847 main_wt_info.branch = None 

848 except (FileNotFoundError, PermissionError): 

849 main_wt_info.branch = None 

850 main_wt_info.detached = True 

851 

852 worktrees.append(main_wt_info) 

853 

854 # List additional worktrees 

855 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

856 if os.path.isdir(worktrees_dir): 

857 for entry in os.listdir(worktrees_dir): 

858 worktree_path = os.path.join(worktrees_dir, entry) 

859 if not os.path.isdir(worktree_path): 

860 continue 

861 

862 wt_info = WorkTreeInfo( 

863 path="", # Will be set below 

864 bare=False, 

865 detached=False, 

866 locked=False, 

867 prunable=False, 

868 ) 

869 

870 # Read gitdir to get actual worktree path 

871 gitdir_path = os.path.join(worktree_path, GITDIR) 

872 try: 

873 with open(gitdir_path, "rb") as f: 

874 gitdir_contents = f.read().strip() 

875 # Convert relative path to absolute if needed 

876 wt_path = os.fsdecode(gitdir_contents) 

877 if not os.path.isabs(wt_path): 

878 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

879 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

880 except (FileNotFoundError, PermissionError): 

881 # Worktree directory is missing, skip it 

882 # TODO: Consider adding these as prunable worktrees with a placeholder path 

883 continue 

884 

885 # Check if worktree path exists 

886 if wt_info.path and not os.path.exists(wt_info.path): 

887 wt_info.prunable = True 

888 

889 # Read HEAD 

890 head_path = os.path.join(worktree_path, "HEAD") 

891 try: 

892 with open(head_path, "rb") as f: 

893 head_contents = f.read().strip() 

894 if head_contents.startswith(SYMREF): 

895 ref_name = head_contents[len(SYMREF) :].strip() 

896 wt_info.branch = ref_name 

897 # Resolve ref to get commit sha 

898 try: 

899 wt_info.head = repo.refs[ref_name] 

900 except KeyError: 

901 wt_info.head = None 

902 else: 

903 wt_info.detached = True 

904 wt_info.branch = None 

905 wt_info.head = head_contents 

906 except (FileNotFoundError, PermissionError): 

907 wt_info.head = None 

908 wt_info.branch = None 

909 

910 # Check if locked 

911 lock_reason = read_worktree_lock_reason(worktree_path) 

912 if lock_reason is not None: 

913 wt_info.locked = True 

914 wt_info.lock_reason = lock_reason 

915 

916 worktrees.append(wt_info) 

917 

918 return worktrees 

919 

920 

921def add_worktree( 

922 repo: Repo, 

923 path: str | bytes | os.PathLike[str], 

924 branch: str | bytes | None = None, 

925 commit: ObjectID | None = None, 

926 force: bool = False, 

927 detach: bool = False, 

928 exist_ok: bool = False, 

929) -> Repo: 

930 """Add a new worktree to the repository. 

931 

932 Args: 

933 repo: The main repository 

934 path: Path where the new worktree should be created 

935 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

936 commit: Specific commit to checkout (results in detached HEAD) 

937 force: Force creation even if branch is already checked out elsewhere 

938 detach: Detach HEAD in the new worktree 

939 exist_ok: If True, do not raise an error if the directory already exists 

940 

941 Returns: 

942 The newly created worktree repository 

943 

944 Raises: 

945 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

946 """ 

947 from .repo import Repo as RepoClass 

948 

949 path = os.fspath(path) 

950 if isinstance(path, bytes): 

951 path = os.fsdecode(path) 

952 

953 # Check if path already exists 

954 if os.path.exists(path) and not exist_ok: 

955 raise ValueError(f"Path already exists: {path}") 

956 

957 # Normalize branch name 

958 if branch is not None: 

959 if isinstance(branch, str): 

960 branch = branch.encode() 

961 branch = local_branch_name(branch) 

962 

963 # Check if branch is already checked out in another worktree 

964 if branch and not force: 

965 for wt in list_worktrees(repo): 

966 if wt.branch == branch: 

967 raise ValueError( 

968 f"Branch {branch.decode()} is already checked out at {wt.path}" 

969 ) 

970 

971 # Determine what to checkout 

972 if commit is not None: 

973 checkout_ref = commit 

974 detach = True 

975 elif branch is not None: 

976 # Check if branch exists 

977 try: 

978 checkout_ref = repo.refs[branch] 

979 except KeyError: 

980 if commit is None: 

981 # Create new branch from HEAD 

982 checkout_ref = repo.head() 

983 repo.refs[branch] = checkout_ref 

984 else: 

985 # Create new branch from specified commit 

986 checkout_ref = commit 

987 repo.refs[branch] = checkout_ref 

988 else: 

989 # Default to current HEAD 

990 checkout_ref = repo.head() 

991 detach = True 

992 

993 # Create the worktree directory 

994 os.makedirs(path, exist_ok=exist_ok) 

995 

996 # Initialize the worktree 

997 identifier = os.path.basename(path) 

998 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier) 

999 

1000 # Set HEAD appropriately 

1001 if detach: 

1002 # Detached HEAD - write SHA directly to HEAD 

1003 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

1004 f.write(checkout_ref + b"\n") 

1005 else: 

1006 # Point to branch 

1007 assert branch is not None # Should be guaranteed by logic above 

1008 wt_repo.refs.set_symbolic_ref(b"HEAD", branch) 

1009 

1010 # Reset index to match HEAD 

1011 wt_repo.get_worktree().reset_index() 

1012 

1013 return wt_repo 

1014 

1015 

1016def remove_worktree( 

1017 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False 

1018) -> None: 

1019 """Remove a worktree. 

1020 

1021 Args: 

1022 repo: The main repository 

1023 path: Path to the worktree to remove 

1024 force: Force removal even if there are local changes 

1025 

1026 Raises: 

1027 ValueError: If the worktree doesn't exist, has local changes, or is locked 

1028 """ 

1029 path = os.fspath(path) 

1030 if isinstance(path, bytes): 

1031 path = os.fsdecode(path) 

1032 

1033 # Don't allow removing the main worktree 

1034 if os.path.abspath(path) == os.path.abspath(repo.path): 

1035 raise ValueError("Cannot remove the main working tree") 

1036 

1037 # Find the worktree 

1038 worktree_found = False 

1039 worktree_id = None 

1040 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1041 

1042 if os.path.isdir(worktrees_dir): 

1043 for entry in os.listdir(worktrees_dir): 

1044 worktree_path = os.path.join(worktrees_dir, entry) 

1045 gitdir_path = os.path.join(worktree_path, GITDIR) 

1046 

1047 try: 

1048 with open(gitdir_path, "rb") as f: 

1049 gitdir_contents = f.read().strip() 

1050 wt_path = os.fsdecode(gitdir_contents) 

1051 if not os.path.isabs(wt_path): 

1052 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1053 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1054 

1055 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1056 worktree_found = True 

1057 worktree_id = entry 

1058 break 

1059 except (FileNotFoundError, PermissionError): 

1060 continue 

1061 

1062 if not worktree_found: 

1063 raise ValueError(f"Worktree not found: {path}") 

1064 

1065 assert worktree_id is not None # Should be set if worktree_found is True 

1066 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

1067 

1068 # Check if locked 

1069 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

1070 if not force: 

1071 raise ValueError(f"Worktree is locked: {path}") 

1072 

1073 # Check for local changes if not forcing 

1074 if not force and os.path.exists(path): 

1075 # TODO: Check for uncommitted changes in the worktree 

1076 pass 

1077 

1078 # Remove the working directory 

1079 if os.path.exists(path): 

1080 shutil.rmtree(path) 

1081 

1082 # Remove the administrative files 

1083 shutil.rmtree(worktree_control_dir) 

1084 

1085 

1086def prune_worktrees( 

1087 repo: Repo, expire: int | None = None, dry_run: bool = False 

1088) -> list[str]: 

1089 """Prune worktree administrative files for missing worktrees. 

1090 

1091 Args: 

1092 repo: The main repository 

1093 expire: Only prune worktrees older than this many seconds 

1094 dry_run: Don't actually remove anything, just report what would be removed 

1095 

1096 Returns: 

1097 List of pruned worktree identifiers 

1098 """ 

1099 pruned: list[str] = [] 

1100 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1101 

1102 if not os.path.isdir(worktrees_dir): 

1103 return pruned 

1104 

1105 current_time = time.time() 

1106 

1107 for entry in os.listdir(worktrees_dir): 

1108 worktree_path = os.path.join(worktrees_dir, entry) 

1109 if not os.path.isdir(worktree_path): 

1110 continue 

1111 

1112 # Skip locked worktrees 

1113 if os.path.exists(os.path.join(worktree_path, "locked")): 

1114 continue 

1115 

1116 should_prune = False 

1117 

1118 # Check if gitdir exists and points to valid location 

1119 gitdir_path = os.path.join(worktree_path, GITDIR) 

1120 try: 

1121 with open(gitdir_path, "rb") as f: 

1122 gitdir_contents = f.read().strip() 

1123 wt_path = os.fsdecode(gitdir_contents) 

1124 if not os.path.isabs(wt_path): 

1125 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1126 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1127 

1128 if not os.path.exists(wt_dir): 

1129 should_prune = True 

1130 except (FileNotFoundError, PermissionError): 

1131 should_prune = True 

1132 

1133 # Check expiry time if specified 

1134 if should_prune and expire is not None: 

1135 stat_info = os.stat(worktree_path) 

1136 age = current_time - stat_info.st_mtime 

1137 if age < expire: 

1138 should_prune = False 

1139 

1140 if should_prune: 

1141 pruned.append(entry) 

1142 if not dry_run: 

1143 shutil.rmtree(worktree_path) 

1144 

1145 return pruned 

1146 

1147 

1148def lock_worktree( 

1149 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None 

1150) -> None: 

1151 """Lock a worktree to prevent it from being pruned. 

1152 

1153 Args: 

1154 repo: The main repository 

1155 path: Path to the worktree to lock 

1156 reason: Optional reason for locking 

1157 """ 

1158 worktree_id = _find_worktree_id(repo, path) 

1159 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1160 

1161 lock_path = os.path.join(worktree_control_dir, "locked") 

1162 with open(lock_path, "w") as f: 

1163 if reason: 

1164 f.write(reason) 

1165 

1166 

1167def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

1168 """Unlock a worktree. 

1169 

1170 Args: 

1171 repo: The main repository 

1172 path: Path to the worktree to unlock 

1173 """ 

1174 worktree_id = _find_worktree_id(repo, path) 

1175 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1176 

1177 lock_path = os.path.join(worktree_control_dir, "locked") 

1178 if os.path.exists(lock_path): 

1179 os.remove(lock_path) 

1180 

1181 

1182def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str: 

1183 """Find the worktree identifier for the given path. 

1184 

1185 Args: 

1186 repo: The main repository 

1187 path: Path to the worktree 

1188 

1189 Returns: 

1190 The worktree identifier 

1191 

1192 Raises: 

1193 ValueError: If the worktree is not found 

1194 """ 

1195 path = os.fspath(path) 

1196 if isinstance(path, bytes): 

1197 path = os.fsdecode(path) 

1198 

1199 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1200 

1201 if os.path.isdir(worktrees_dir): 

1202 for entry in os.listdir(worktrees_dir): 

1203 worktree_path = os.path.join(worktrees_dir, entry) 

1204 gitdir_path = os.path.join(worktree_path, GITDIR) 

1205 

1206 try: 

1207 with open(gitdir_path, "rb") as f: 

1208 gitdir_contents = f.read().strip() 

1209 wt_path = os.fsdecode(gitdir_contents) 

1210 if not os.path.isabs(wt_path): 

1211 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1212 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1213 

1214 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1215 return entry 

1216 except (FileNotFoundError, PermissionError): 

1217 continue 

1218 

1219 raise ValueError(f"Worktree not found: {path}") 

1220 

1221 

1222def move_worktree( 

1223 repo: Repo, 

1224 old_path: str | bytes | os.PathLike[str], 

1225 new_path: str | bytes | os.PathLike[str], 

1226) -> None: 

1227 """Move a worktree to a new location. 

1228 

1229 Args: 

1230 repo: The main repository 

1231 old_path: Current path of the worktree 

1232 new_path: New path for the worktree 

1233 

1234 Raises: 

1235 ValueError: If the worktree doesn't exist or new path already exists 

1236 """ 

1237 old_path = os.fspath(old_path) 

1238 new_path = os.fspath(new_path) 

1239 if isinstance(old_path, bytes): 

1240 old_path = os.fsdecode(old_path) 

1241 if isinstance(new_path, bytes): 

1242 new_path = os.fsdecode(new_path) 

1243 

1244 # Don't allow moving the main worktree 

1245 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1246 raise ValueError("Cannot move the main working tree") 

1247 

1248 # Check if new path already exists 

1249 if os.path.exists(new_path): 

1250 raise ValueError(f"Path already exists: {new_path}") 

1251 

1252 # Find the worktree 

1253 worktree_id = _find_worktree_id(repo, old_path) 

1254 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1255 

1256 # Move the actual worktree directory 

1257 shutil.move(old_path, new_path) 

1258 

1259 # Update the gitdir file in the worktree 

1260 gitdir_file = os.path.join(new_path, ".git") 

1261 

1262 # Update the gitdir pointer in the control directory 

1263 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1264 f.write(os.fsencode(gitdir_file) + b"\n") 

1265 

1266 

1267def repair_worktree( 

1268 repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None 

1269) -> list[str]: 

1270 """Repair worktree administrative files. 

1271 

1272 This repairs the connection between worktrees and the main repository 

1273 when they have been moved or become corrupted. 

1274 

1275 Args: 

1276 repo: The main repository 

1277 paths: Optional list of worktree paths to repair. If None, repairs 

1278 connections from the main repository to all linked worktrees. 

1279 

1280 Returns: 

1281 List of repaired worktree paths 

1282 

1283 Raises: 

1284 ValueError: If a specified path is not a valid worktree 

1285 """ 

1286 repaired: list[str] = [] 

1287 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1288 

1289 if paths: 

1290 # Repair specific worktrees 

1291 for path in paths: 

1292 path_str = os.fspath(path) 

1293 if isinstance(path_str, bytes): 

1294 path_str = os.fsdecode(path_str) 

1295 path_str = os.path.abspath(path_str) 

1296 

1297 # Check if this is a linked worktree 

1298 gitdir_file = os.path.join(path_str, ".git") 

1299 if not os.path.exists(gitdir_file): 

1300 raise ValueError(f"Not a valid worktree: {path_str}") 

1301 

1302 # Read the .git file to get the worktree control directory 

1303 try: 

1304 with open(gitdir_file, "rb") as f: 

1305 gitdir_content = f.read().strip() 

1306 if gitdir_content.startswith(b"gitdir: "): 

1307 worktree_control_path = gitdir_content[8:].decode() 

1308 else: 

1309 raise ValueError(f"Invalid .git file in worktree: {path_str}") 

1310 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e: 

1311 raise ValueError( 

1312 f"Cannot read .git file in worktree: {path_str}" 

1313 ) from e 

1314 

1315 # Make the path absolute if it's relative 

1316 if not os.path.isabs(worktree_control_path): 

1317 worktree_control_path = os.path.abspath( 

1318 os.path.join(path_str, worktree_control_path) 

1319 ) 

1320 

1321 # Update the gitdir file in the worktree control directory 

1322 gitdir_pointer = os.path.join(worktree_control_path, GITDIR) 

1323 if os.path.exists(gitdir_pointer): 

1324 # Update to point to the current location 

1325 with open(gitdir_pointer, "wb") as f: 

1326 f.write(os.fsencode(gitdir_file) + b"\n") 

1327 repaired.append(path_str) 

1328 else: 

1329 # Repair from main repository to all linked worktrees 

1330 if not os.path.isdir(worktrees_dir): 

1331 return repaired 

1332 

1333 for entry in os.listdir(worktrees_dir): 

1334 worktree_control_path = os.path.join(worktrees_dir, entry) 

1335 if not os.path.isdir(worktree_control_path): 

1336 continue 

1337 

1338 # Read the gitdir file to find where the worktree thinks it is 

1339 gitdir_path = os.path.join(worktree_control_path, GITDIR) 

1340 try: 

1341 with open(gitdir_path, "rb") as f: 

1342 gitdir_contents = f.read().strip() 

1343 old_gitdir_location = os.fsdecode(gitdir_contents) 

1344 except (FileNotFoundError, PermissionError): 

1345 # Can't repair if we can't read the gitdir file 

1346 continue 

1347 

1348 # Get the worktree directory (remove .git suffix) 

1349 old_worktree_path = os.path.dirname(old_gitdir_location) 

1350 

1351 # Check if the .git file exists at the old location 

1352 if os.path.exists(old_gitdir_location): 

1353 # Try to read and update the .git file to ensure it points back correctly 

1354 try: 

1355 with open(old_gitdir_location, "rb") as f: 

1356 content = f.read().strip() 

1357 if content.startswith(b"gitdir: "): 

1358 current_pointer = content[8:].decode() 

1359 if not os.path.isabs(current_pointer): 

1360 current_pointer = os.path.abspath( 

1361 os.path.join(old_worktree_path, current_pointer) 

1362 ) 

1363 

1364 # If it doesn't point to the right place, fix it 

1365 expected_pointer = worktree_control_path 

1366 if os.path.abspath(current_pointer) != os.path.abspath( 

1367 expected_pointer 

1368 ): 

1369 # Update the .git file to point to the correct location 

1370 with open(old_gitdir_location, "wb") as wf: 

1371 wf.write( 

1372 b"gitdir: " 

1373 + os.fsencode(worktree_control_path) 

1374 + b"\n" 

1375 ) 

1376 repaired.append(old_worktree_path) 

1377 except (PermissionError, UnicodeDecodeError): 

1378 continue 

1379 

1380 return repaired 

1381 

1382 

1383@contextmanager 

1384def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1385 """Create a temporary worktree that is automatically cleaned up. 

1386 

1387 Args: 

1388 repo: Dulwich repository object 

1389 prefix: Prefix for the temporary directory name 

1390 

1391 Yields: 

1392 Worktree object 

1393 """ 

1394 temp_dir = None 

1395 worktree = None 

1396 

1397 try: 

1398 # Create temporary directory 

1399 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1400 

1401 # Add worktree 

1402 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1403 

1404 yield worktree 

1405 

1406 finally: 

1407 # Clean up worktree registration 

1408 if worktree: 

1409 repo.worktrees.remove(worktree.path) 

1410 

1411 # Clean up temporary directory 

1412 if temp_dir and Path(temp_dir).exists(): 

1413 shutil.rmtree(temp_dir)