Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

633 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26__all__ = [ 

27 "WorkTree", 

28 "WorkTreeContainer", 

29 "WorkTreeInfo", 

30 "add_worktree", 

31 "list_worktrees", 

32 "lock_worktree", 

33 "move_worktree", 

34 "prune_worktrees", 

35 "read_worktree_lock_reason", 

36 "remove_worktree", 

37 "repair_worktree", 

38 "temporary_worktree", 

39 "unlock_worktree", 

40] 

41 

42import builtins 

43import os 

44import shutil 

45import stat 

46import sys 

47import tempfile 

48import time 

49import warnings 

50from collections.abc import Callable, Iterable, Iterator, Sequence 

51from contextlib import contextmanager 

52from pathlib import Path 

53from typing import Any 

54 

55from .errors import CommitError, HookError 

56from .objects import Blob, Commit, ObjectID, Tag, Tree 

57from .refs import SYMREF, Ref, local_branch_name 

58from .repo import ( 

59 GITDIR, 

60 WORKTREES, 

61 Repo, 

62 check_user_identity, 

63 get_user_identity, 

64) 

65from .trailers import add_trailer_to_message 

66 

67 

68class WorkTreeInfo: 

69 """Information about a single worktree. 

70 

71 Attributes: 

72 path: Path to the worktree 

73 head: Current HEAD commit SHA 

74 branch: Current branch (if not detached) 

75 bare: Whether this is a bare repository 

76 detached: Whether HEAD is detached 

77 locked: Whether the worktree is locked 

78 prunable: Whether the worktree can be pruned 

79 lock_reason: Reason for locking (if locked) 

80 """ 

81 

82 def __init__( 

83 self, 

84 path: str, 

85 head: bytes | None = None, 

86 branch: Ref | None = None, 

87 bare: bool = False, 

88 detached: bool = False, 

89 locked: bool = False, 

90 prunable: bool = False, 

91 lock_reason: str | None = None, 

92 ): 

93 """Initialize WorkTreeInfo. 

94 

95 Args: 

96 path: Path to the worktree 

97 head: Current HEAD commit SHA 

98 branch: Current branch (if not detached) 

99 bare: Whether this is a bare repository 

100 detached: Whether HEAD is detached 

101 locked: Whether the worktree is locked 

102 prunable: Whether the worktree can be pruned 

103 lock_reason: Reason for locking (if locked) 

104 """ 

105 self.path = path 

106 self.head = head 

107 self.branch = branch 

108 self.bare = bare 

109 self.detached = detached 

110 self.locked = locked 

111 self.prunable = prunable 

112 self.lock_reason = lock_reason 

113 

114 def __repr__(self) -> str: 

115 """Return string representation of WorkTreeInfo.""" 

116 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

117 

118 def __eq__(self, other: object) -> bool: 

119 """Check equality with another WorkTreeInfo.""" 

120 if not isinstance(other, WorkTreeInfo): 

121 return NotImplemented 

122 return ( 

123 self.path == other.path 

124 and self.head == other.head 

125 and self.branch == other.branch 

126 and self.bare == other.bare 

127 and self.detached == other.detached 

128 and self.locked == other.locked 

129 and self.prunable == other.prunable 

130 and self.lock_reason == other.lock_reason 

131 ) 

132 

133 def open(self) -> WorkTree: 

134 """Open this worktree as a WorkTree. 

135 

136 Returns: 

137 WorkTree object for this worktree 

138 

139 Raises: 

140 NotGitRepository: If the worktree path is invalid 

141 """ 

142 from .repo import Repo 

143 

144 repo = Repo(self.path) 

145 return WorkTree(repo, self.path) 

146 

147 

148class WorkTreeContainer: 

149 """Container for managing multiple working trees. 

150 

151 This class manages worktrees for a repository, similar to how 

152 RefsContainer manages references. 

153 """ 

154 

155 def __init__(self, repo: Repo) -> None: 

156 """Initialize a WorkTreeContainer for the given repository. 

157 

158 Args: 

159 repo: The repository this container belongs to 

160 """ 

161 self._repo = repo 

162 

163 def list(self) -> list[WorkTreeInfo]: 

164 """List all worktrees for this repository. 

165 

166 Returns: 

167 A list of WorkTreeInfo objects 

168 """ 

169 return list_worktrees(self._repo) 

170 

171 def add( 

172 self, 

173 path: str | bytes | os.PathLike[str], 

174 branch: str | bytes | None = None, 

175 commit: ObjectID | None = None, 

176 force: bool = False, 

177 detach: bool = False, 

178 exist_ok: bool = False, 

179 ) -> Repo: 

180 """Add a new worktree. 

181 

182 Args: 

183 path: Path where the new worktree should be created 

184 branch: Branch to checkout in the new worktree 

185 commit: Specific commit to checkout (results in detached HEAD) 

186 force: Force creation even if branch is already checked out elsewhere 

187 detach: Detach HEAD in the new worktree 

188 exist_ok: If True, do not raise an error if the directory already exists 

189 

190 Returns: 

191 The newly created worktree repository 

192 """ 

193 return add_worktree( 

194 self._repo, 

195 path, 

196 branch=branch, 

197 commit=commit, 

198 force=force, 

199 detach=detach, 

200 exist_ok=exist_ok, 

201 ) 

202 

203 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None: 

204 """Remove a worktree. 

205 

206 Args: 

207 path: Path to the worktree to remove 

208 force: Force removal even if there are local changes 

209 """ 

210 remove_worktree(self._repo, path, force=force) 

211 

212 def prune( 

213 self, expire: int | None = None, dry_run: bool = False 

214 ) -> builtins.list[str]: 

215 """Prune worktree administrative files for missing worktrees. 

216 

217 Args: 

218 expire: Only prune worktrees older than this many seconds 

219 dry_run: Don't actually remove anything, just report what would be removed 

220 

221 Returns: 

222 List of pruned worktree identifiers 

223 """ 

224 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

225 

226 def move( 

227 self, 

228 old_path: str | bytes | os.PathLike[str], 

229 new_path: str | bytes | os.PathLike[str], 

230 ) -> None: 

231 """Move a worktree to a new location. 

232 

233 Args: 

234 old_path: Current path of the worktree 

235 new_path: New path for the worktree 

236 """ 

237 move_worktree(self._repo, old_path, new_path) 

238 

239 def lock( 

240 self, path: str | bytes | os.PathLike[str], reason: str | None = None 

241 ) -> None: 

242 """Lock a worktree to prevent it from being pruned. 

243 

244 Args: 

245 path: Path to the worktree to lock 

246 reason: Optional reason for locking 

247 """ 

248 lock_worktree(self._repo, path, reason=reason) 

249 

250 def unlock(self, path: str | bytes | os.PathLike[str]) -> None: 

251 """Unlock a worktree. 

252 

253 Args: 

254 path: Path to the worktree to unlock 

255 """ 

256 unlock_worktree(self._repo, path) 

257 

258 def repair( 

259 self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None 

260 ) -> builtins.list[str]: 

261 """Repair worktree administrative files. 

262 

263 Args: 

264 paths: Optional list of worktree paths to repair. If None, repairs 

265 connections from the main repository to all linked worktrees. 

266 

267 Returns: 

268 List of repaired worktree paths 

269 """ 

270 return repair_worktree(self._repo, paths=paths) 

271 

272 def __iter__(self) -> Iterator[WorkTreeInfo]: 

273 """Iterate over all worktrees.""" 

274 yield from self.list() 

275 

276 

277class WorkTree: 

278 """Working tree operations for a Git repository. 

279 

280 This class provides methods for working with the working tree, 

281 such as staging files, committing changes, and resetting the index. 

282 """ 

283 

284 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

285 """Initialize a WorkTree for the given repository. 

286 

287 Args: 

288 repo: The repository this working tree belongs to 

289 path: Path to the working tree directory 

290 """ 

291 self._repo = repo 

292 raw_path = os.fspath(path) 

293 if isinstance(raw_path, bytes): 

294 self.path: str = os.fsdecode(raw_path) 

295 else: 

296 self.path = raw_path 

297 self.path = os.path.abspath(self.path) 

298 

299 def stage( 

300 self, 

301 fs_paths: str 

302 | bytes 

303 | os.PathLike[str] 

304 | Iterable[str | bytes | os.PathLike[str]], 

305 ) -> None: 

306 """Stage a set of paths. 

307 

308 Args: 

309 fs_paths: List of paths, relative to the repository path 

310 """ 

311 root_path_bytes = os.fsencode(self.path) 

312 

313 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

314 fs_paths = [fs_paths] 

315 fs_paths = list(fs_paths) 

316 

317 from .index import ( 

318 _fs_to_tree_path, 

319 blob_from_path_and_stat, 

320 index_entry_from_directory, 

321 index_entry_from_stat, 

322 ) 

323 

324 index = self._repo.open_index() 

325 blob_normalizer = self._repo.get_blob_normalizer() 

326 for fs_path in fs_paths: 

327 if not isinstance(fs_path, bytes): 

328 fs_path = os.fsencode(fs_path) 

329 if os.path.isabs(fs_path): 

330 raise ValueError( 

331 f"path {fs_path!r} should be relative to " 

332 "repository root, not absolute" 

333 ) 

334 tree_path = _fs_to_tree_path(fs_path) 

335 full_path = os.path.join(root_path_bytes, fs_path) 

336 try: 

337 st = os.lstat(full_path) 

338 except (FileNotFoundError, NotADirectoryError): 

339 # File no longer exists 

340 try: 

341 del index[tree_path] 

342 except KeyError: 

343 pass # already removed 

344 else: 

345 if stat.S_ISDIR(st.st_mode): 

346 entry = index_entry_from_directory(st, full_path) 

347 if entry: 

348 index[tree_path] = entry 

349 else: 

350 try: 

351 del index[tree_path] 

352 except KeyError: 

353 pass 

354 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

355 try: 

356 del index[tree_path] 

357 except KeyError: 

358 pass 

359 else: 

360 blob = blob_from_path_and_stat(full_path, st) 

361 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

362 self._repo.object_store.add_object(blob) 

363 index[tree_path] = index_entry_from_stat(st, blob.id) 

364 index.write() 

365 

366 def unstage(self, fs_paths: Sequence[str]) -> None: 

367 """Unstage specific file in the index. 

368 

369 Args: 

370 fs_paths: a list of files to unstage, 

371 relative to the repository path. 

372 """ 

373 from .index import IndexEntry, _fs_to_tree_path 

374 

375 index = self._repo.open_index() 

376 try: 

377 commit = self._repo[Ref(b"HEAD")] 

378 except KeyError: 

379 # no head mean no commit in the repo 

380 for fs_path in fs_paths: 

381 tree_path = _fs_to_tree_path(fs_path) 

382 del index[tree_path] 

383 index.write() 

384 return 

385 else: 

386 assert isinstance(commit, Commit), "HEAD must be a commit" 

387 tree_id = commit.tree 

388 

389 for fs_path in fs_paths: 

390 tree_path = _fs_to_tree_path(fs_path) 

391 try: 

392 tree = self._repo.object_store[tree_id] 

393 assert isinstance(tree, Tree) 

394 tree_entry = tree.lookup_path( 

395 self._repo.object_store.__getitem__, tree_path 

396 ) 

397 except KeyError: 

398 # if tree_entry didn't exist, this file was being added, so 

399 # remove index entry 

400 try: 

401 del index[tree_path] 

402 continue 

403 except KeyError as exc: 

404 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

405 

406 st = None 

407 try: 

408 st = os.lstat(os.path.join(self.path, fs_path)) 

409 except FileNotFoundError: 

410 pass 

411 

412 blob_obj = self._repo[tree_entry[1]] 

413 assert isinstance(blob_obj, Blob) 

414 blob_size = len(blob_obj.data) 

415 

416 index_entry = IndexEntry( 

417 ctime=(commit.commit_time, 0), 

418 mtime=(commit.commit_time, 0), 

419 dev=st.st_dev if st else 0, 

420 ino=st.st_ino if st else 0, 

421 mode=tree_entry[0], 

422 uid=st.st_uid if st else 0, 

423 gid=st.st_gid if st else 0, 

424 size=blob_size, 

425 sha=tree_entry[1], 

426 flags=0, 

427 extended_flags=0, 

428 ) 

429 

430 index[tree_path] = index_entry 

431 index.write() 

432 

433 def commit( 

434 self, 

435 message: str | bytes | Callable[[Any, Commit], bytes] | None = None, 

436 committer: bytes | None = None, 

437 author: bytes | None = None, 

438 commit_timestamp: float | None = None, 

439 commit_timezone: int | None = None, 

440 author_timestamp: float | None = None, 

441 author_timezone: int | None = None, 

442 tree: ObjectID | None = None, 

443 encoding: bytes | None = None, 

444 ref: Ref | None = Ref(b"HEAD"), 

445 merge_heads: Sequence[ObjectID] | None = None, 

446 no_verify: bool = False, 

447 sign: bool | None = None, 

448 signoff: bool | None = None, 

449 ) -> ObjectID: 

450 """Create a new commit. 

451 

452 If not specified, committer and author default to 

453 get_user_identity(..., 'COMMITTER') 

454 and get_user_identity(..., 'AUTHOR') respectively. 

455 

456 Args: 

457 message: Commit message (bytes or callable that takes (repo, commit) 

458 and returns bytes) 

459 committer: Committer fullname 

460 author: Author fullname 

461 commit_timestamp: Commit timestamp (defaults to now) 

462 commit_timezone: Commit timestamp timezone (defaults to GMT) 

463 author_timestamp: Author timestamp (defaults to commit 

464 timestamp) 

465 author_timezone: Author timestamp timezone 

466 (defaults to commit timestamp timezone) 

467 tree: SHA1 of the tree root to use (if not specified the 

468 current index will be committed). 

469 encoding: Encoding 

470 ref: Optional ref to commit to (defaults to current branch). 

471 If None, creates a dangling commit without updating any ref. 

472 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

473 no_verify: Skip pre-commit and commit-msg hooks 

474 sign: GPG Sign the commit (bool, defaults to False, 

475 pass True to use default GPG key, 

476 pass a str containing Key ID to use a specific GPG key) 

477 signoff: Add Signed-off-by line (DCO) to commit message. 

478 If None, uses format.signoff config. 

479 

480 Returns: 

481 New commit SHA1 

482 """ 

483 try: 

484 if not no_verify: 

485 self._repo.hooks["pre-commit"].execute() 

486 except HookError as exc: 

487 raise CommitError(exc) from exc 

488 except KeyError: # no hook defined, silent fallthrough 

489 pass 

490 

491 c = Commit() 

492 if tree is None: 

493 index = self._repo.open_index() 

494 c.tree = index.commit(self._repo.object_store) 

495 else: 

496 if len(tree) != 40: 

497 raise ValueError("tree must be a 40-byte hex sha string") 

498 c.tree = tree 

499 

500 config = self._repo.get_config_stack() 

501 if merge_heads is None: 

502 merge_heads = self._repo._read_heads("MERGE_HEAD") 

503 if committer is None: 

504 committer = get_user_identity(config, kind="COMMITTER") 

505 check_user_identity(committer) 

506 c.committer = committer 

507 if commit_timestamp is None: 

508 # FIXME: Support GIT_COMMITTER_DATE environment variable 

509 commit_timestamp = time.time() 

510 c.commit_time = int(commit_timestamp) 

511 if commit_timezone is None: 

512 # FIXME: Use current user timezone rather than UTC 

513 commit_timezone = 0 

514 c.commit_timezone = commit_timezone 

515 if author is None: 

516 author = get_user_identity(config, kind="AUTHOR") 

517 c.author = author 

518 check_user_identity(author) 

519 if author_timestamp is None: 

520 # FIXME: Support GIT_AUTHOR_DATE environment variable 

521 author_timestamp = commit_timestamp 

522 c.author_time = int(author_timestamp) 

523 if author_timezone is None: 

524 author_timezone = commit_timezone 

525 c.author_timezone = author_timezone 

526 if encoding is None: 

527 try: 

528 encoding = config.get(("i18n",), "commitEncoding") 

529 except KeyError: 

530 pass # No dice 

531 if encoding is not None: 

532 c.encoding = encoding 

533 # Store original message (might be callable) 

534 original_message = message 

535 message = None # Will be set later after parents are set 

536 

537 # Check if we should sign the commit 

538 if sign is None: 

539 # Check commit.gpgSign configuration when sign is not explicitly set 

540 try: 

541 should_sign = config.get_boolean( 

542 (b"commit",), b"gpgsign", default=False 

543 ) 

544 except KeyError: 

545 should_sign = False # Default to not signing if no config 

546 else: 

547 should_sign = sign 

548 

549 # Get the signing key from config if signing is enabled 

550 keyid = None 

551 if should_sign: 

552 try: 

553 keyid_bytes = config.get((b"user",), b"signingkey") 

554 keyid = keyid_bytes.decode() if keyid_bytes else None 

555 except KeyError: 

556 keyid = None 

557 

558 if ref is None: 

559 # Create a dangling commit 

560 c.parents = merge_heads 

561 else: 

562 try: 

563 old_head = self._repo.refs[ref] 

564 c.parents = [old_head, *merge_heads] 

565 except KeyError: 

566 c.parents = merge_heads 

567 

568 # Handle message after parents are set 

569 if callable(original_message): 

570 message = original_message(self._repo, c) 

571 if message is None: 

572 raise ValueError("Message callback returned None") 

573 else: 

574 message = original_message 

575 

576 if message is None: 

577 # FIXME: Try to read commit message from .git/MERGE_MSG 

578 raise ValueError("No commit message specified") 

579 

580 # Handle signoff 

581 should_signoff = signoff 

582 if should_signoff is None: 

583 # Check format.signOff configuration 

584 try: 

585 should_signoff = config.get_boolean( 

586 (b"format",), b"signoff", default=False 

587 ) 

588 except KeyError: 

589 should_signoff = False 

590 

591 if should_signoff: 

592 # Add Signed-off-by trailer 

593 # Get the committer identity for the signoff 

594 signoff_identity = committer 

595 if isinstance(message, bytes): 

596 message_bytes = message 

597 else: 

598 message_bytes = message.encode("utf-8") 

599 

600 message_bytes = add_trailer_to_message( 

601 message_bytes, 

602 "Signed-off-by", 

603 signoff_identity.decode("utf-8") 

604 if isinstance(signoff_identity, bytes) 

605 else signoff_identity, 

606 separator=":", 

607 where="end", 

608 if_exists="addIfDifferentNeighbor", 

609 if_missing="add", 

610 ) 

611 message = message_bytes 

612 

613 try: 

614 if no_verify: 

615 c.message = message 

616 else: 

617 c.message = self._repo.hooks["commit-msg"].execute(message) 

618 if c.message is None: 

619 c.message = message 

620 except HookError as exc: 

621 raise CommitError(exc) from exc 

622 except KeyError: # no hook defined, message not modified 

623 c.message = message 

624 

625 if ref is None: 

626 # Create a dangling commit 

627 if should_sign: 

628 c.sign(keyid) 

629 self._repo.object_store.add_object(c) 

630 else: 

631 try: 

632 old_head = self._repo.refs[ref] 

633 if should_sign: 

634 c.sign(keyid) 

635 self._repo.object_store.add_object(c) 

636 message_bytes = ( 

637 message.encode() if isinstance(message, str) else message 

638 ) 

639 ok = self._repo.refs.set_if_equals( 

640 ref, 

641 old_head, 

642 c.id, 

643 message=b"commit: " + message_bytes, 

644 committer=committer, 

645 timestamp=int(commit_timestamp) 

646 if commit_timestamp is not None 

647 else None, 

648 timezone=commit_timezone, 

649 ) 

650 except KeyError: 

651 c.parents = merge_heads 

652 if should_sign: 

653 c.sign(keyid) 

654 self._repo.object_store.add_object(c) 

655 message_bytes = ( 

656 message.encode() if isinstance(message, str) else message 

657 ) 

658 ok = self._repo.refs.add_if_new( 

659 ref, 

660 c.id, 

661 message=b"commit: " + message_bytes, 

662 committer=committer, 

663 timestamp=int(commit_timestamp) 

664 if commit_timestamp is not None 

665 else None, 

666 timezone=commit_timezone, 

667 ) 

668 if not ok: 

669 # Fail if the atomic compare-and-swap failed, leaving the 

670 # commit and all its objects as garbage. 

671 raise CommitError(f"{ref!r} changed during commit") 

672 

673 self._repo._del_named_file("MERGE_HEAD") 

674 

675 try: 

676 self._repo.hooks["post-commit"].execute() 

677 except HookError as e: # silent failure 

678 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

679 except KeyError: # no hook defined, silent fallthrough 

680 pass 

681 

682 # Trigger auto GC if needed 

683 from .gc import maybe_auto_gc 

684 

685 maybe_auto_gc(self._repo) 

686 

687 return c.id 

688 

689 def reset_index(self, tree: ObjectID | None = None) -> None: 

690 """Reset the index back to a specific tree. 

691 

692 Args: 

693 tree: Tree SHA to reset to, None for current HEAD tree. 

694 """ 

695 from .index import ( 

696 build_index_from_tree, 

697 symlink, 

698 validate_path_element_default, 

699 validate_path_element_hfs, 

700 validate_path_element_ntfs, 

701 ) 

702 

703 if tree is None: 

704 head = self._repo[Ref(b"HEAD")] 

705 if isinstance(head, Tag): 

706 _cls, obj = head.object 

707 head = self._repo.get_object(obj) 

708 from .objects import Commit 

709 

710 assert isinstance(head, Commit) 

711 tree = head.tree 

712 config = self._repo.get_config() 

713 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

714 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

715 validate_path_element = validate_path_element_ntfs 

716 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

717 validate_path_element = validate_path_element_hfs 

718 else: 

719 validate_path_element = validate_path_element_default 

720 if config.get_boolean(b"core", b"symlinks", True): 

721 symlink_fn = symlink 

722 else: 

723 

724 def symlink_fn( # type: ignore[misc,unused-ignore] 

725 src: str | bytes, 

726 dst: str | bytes, 

727 target_is_directory: bool = False, 

728 *, 

729 dir_fd: int | None = None, 

730 ) -> None: 

731 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f: 

732 f.write(src) 

733 

734 blob_normalizer = self._repo.get_blob_normalizer() 

735 return build_index_from_tree( 

736 self.path, 

737 self._repo.index_path(), 

738 self._repo.object_store, 

739 tree, 

740 honor_filemode=honor_filemode, 

741 validate_path_element=validate_path_element, 

742 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore] 

743 blob_normalizer=blob_normalizer, 

744 ) 

745 

746 def _sparse_checkout_file_path(self) -> str: 

747 """Return the path of the sparse-checkout file in this repo's control dir.""" 

748 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

749 

750 def configure_for_cone_mode(self) -> None: 

751 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

752 config = self._repo.get_config() 

753 config.set((b"core",), b"sparseCheckout", b"true") 

754 config.set((b"core",), b"sparseCheckoutCone", b"true") 

755 config.write_to_path() 

756 

757 def infer_cone_mode(self) -> bool: 

758 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

759 config = self._repo.get_config() 

760 try: 

761 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

762 return sc_cone == b"true" 

763 except KeyError: 

764 # If core.sparseCheckoutCone is not set, default to False 

765 return False 

766 

767 def get_sparse_checkout_patterns(self) -> list[str]: 

768 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

769 

770 Returns: 

771 A list of patterns. Returns an empty list if the file is missing. 

772 """ 

773 path = self._sparse_checkout_file_path() 

774 try: 

775 with open(path, encoding="utf-8") as f: 

776 return [line.strip() for line in f if line.strip()] 

777 except FileNotFoundError: 

778 return [] 

779 

780 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

781 """Write the given sparse-checkout patterns into info/sparse-checkout. 

782 

783 Creates the info/ directory if it does not exist. 

784 

785 Args: 

786 patterns: A list of gitignore-style patterns to store. 

787 """ 

788 info_dir = os.path.join(self._repo.controldir(), "info") 

789 os.makedirs(info_dir, exist_ok=True) 

790 

791 path = self._sparse_checkout_file_path() 

792 with open(path, "w", encoding="utf-8") as f: 

793 for pat in patterns: 

794 f.write(pat + "\n") 

795 

796 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

797 """Write the given cone-mode directory patterns into info/sparse-checkout. 

798 

799 For each directory to include, add an inclusion line that "undoes" the prior 

800 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

801 Never add the same line twice. 

802 """ 

803 patterns = ["/*", "!/*/"] 

804 if dirs: 

805 for d in dirs: 

806 d = d.strip("/") 

807 line = f"/{d}/" 

808 if d and line not in patterns: 

809 patterns.append(line) 

810 self.set_sparse_checkout_patterns(patterns) 

811 

812 

813def read_worktree_lock_reason(worktree_path: str) -> str | None: 

814 """Read the lock reason for a worktree. 

815 

816 Args: 

817 worktree_path: Path to the worktree's administrative directory 

818 

819 Returns: 

820 The lock reason if the worktree is locked, None otherwise 

821 """ 

822 locked_path = os.path.join(worktree_path, "locked") 

823 if not os.path.exists(locked_path): 

824 return None 

825 

826 try: 

827 with open(locked_path) as f: 

828 return f.read().strip() 

829 except (FileNotFoundError, PermissionError): 

830 return None 

831 

832 

833def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

834 """List all worktrees for the given repository. 

835 

836 Args: 

837 repo: The repository to list worktrees for 

838 

839 Returns: 

840 A list of WorkTreeInfo objects 

841 """ 

842 worktrees = [] 

843 

844 # Add main worktree 

845 main_wt_info = WorkTreeInfo( 

846 path=repo.path, 

847 head=repo.head(), 

848 bare=repo.bare, 

849 detached=False, 

850 locked=False, 

851 prunable=False, 

852 ) 

853 

854 # Get branch info for main worktree 

855 try: 

856 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

857 head_contents = f.read().strip() 

858 if head_contents.startswith(SYMREF): 

859 ref_name = Ref(head_contents[len(SYMREF) :].strip()) 

860 main_wt_info.branch = ref_name 

861 else: 

862 main_wt_info.detached = True 

863 main_wt_info.branch = None 

864 except (FileNotFoundError, PermissionError): 

865 main_wt_info.branch = None 

866 main_wt_info.detached = True 

867 

868 worktrees.append(main_wt_info) 

869 

870 # List additional worktrees 

871 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

872 if os.path.isdir(worktrees_dir): 

873 for entry in os.listdir(worktrees_dir): 

874 worktree_path = os.path.join(worktrees_dir, entry) 

875 if not os.path.isdir(worktree_path): 

876 continue 

877 

878 wt_info = WorkTreeInfo( 

879 path="", # Will be set below 

880 bare=False, 

881 detached=False, 

882 locked=False, 

883 prunable=False, 

884 ) 

885 

886 # Read gitdir to get actual worktree path 

887 gitdir_path = os.path.join(worktree_path, GITDIR) 

888 try: 

889 with open(gitdir_path, "rb") as f: 

890 gitdir_contents = f.read().strip() 

891 # Convert relative path to absolute if needed 

892 wt_path = os.fsdecode(gitdir_contents) 

893 if not os.path.isabs(wt_path): 

894 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

895 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

896 except (FileNotFoundError, PermissionError): 

897 # Worktree directory is missing, skip it 

898 # TODO: Consider adding these as prunable worktrees with a placeholder path 

899 continue 

900 

901 # Check if worktree path exists 

902 if wt_info.path and not os.path.exists(wt_info.path): 

903 wt_info.prunable = True 

904 

905 # Read HEAD 

906 head_path = os.path.join(worktree_path, "HEAD") 

907 try: 

908 with open(head_path, "rb") as f: 

909 head_contents = f.read().strip() 

910 if head_contents.startswith(SYMREF): 

911 ref_name = Ref(head_contents[len(SYMREF) :].strip()) 

912 wt_info.branch = ref_name 

913 # Resolve ref to get commit sha 

914 try: 

915 wt_info.head = repo.refs[ref_name] 

916 except KeyError: 

917 wt_info.head = None 

918 else: 

919 wt_info.detached = True 

920 wt_info.branch = None 

921 wt_info.head = head_contents 

922 except (FileNotFoundError, PermissionError): 

923 wt_info.head = None 

924 wt_info.branch = None 

925 

926 # Check if locked 

927 lock_reason = read_worktree_lock_reason(worktree_path) 

928 if lock_reason is not None: 

929 wt_info.locked = True 

930 wt_info.lock_reason = lock_reason 

931 

932 worktrees.append(wt_info) 

933 

934 return worktrees 

935 

936 

937def add_worktree( 

938 repo: Repo, 

939 path: str | bytes | os.PathLike[str], 

940 branch: str | bytes | None = None, 

941 commit: ObjectID | None = None, 

942 force: bool = False, 

943 detach: bool = False, 

944 exist_ok: bool = False, 

945) -> Repo: 

946 """Add a new worktree to the repository. 

947 

948 Args: 

949 repo: The main repository 

950 path: Path where the new worktree should be created 

951 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

952 commit: Specific commit to checkout (results in detached HEAD) 

953 force: Force creation even if branch is already checked out elsewhere 

954 detach: Detach HEAD in the new worktree 

955 exist_ok: If True, do not raise an error if the directory already exists 

956 

957 Returns: 

958 The newly created worktree repository 

959 

960 Raises: 

961 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

962 """ 

963 from .repo import Repo as RepoClass 

964 

965 path = os.fspath(path) 

966 if isinstance(path, bytes): 

967 path = os.fsdecode(path) 

968 

969 # Check if path already exists 

970 if os.path.exists(path) and not exist_ok: 

971 raise ValueError(f"Path already exists: {path}") 

972 

973 # Normalize branch name 

974 if branch is not None: 

975 if isinstance(branch, str): 

976 branch = branch.encode() 

977 branch = local_branch_name(branch) 

978 

979 # Check if branch is already checked out in another worktree 

980 if branch and not force: 

981 for wt in list_worktrees(repo): 

982 if wt.branch == branch: 

983 raise ValueError( 

984 f"Branch {branch.decode()} is already checked out at {wt.path}" 

985 ) 

986 

987 # Determine what to checkout 

988 if commit is not None: 

989 checkout_ref = commit 

990 detach = True 

991 elif branch is not None: 

992 # Check if branch exists 

993 try: 

994 checkout_ref = repo.refs[branch] 

995 except KeyError: 

996 if commit is None: 

997 # Create new branch from HEAD 

998 checkout_ref = repo.head() 

999 repo.refs[branch] = checkout_ref 

1000 else: 

1001 # Create new branch from specified commit 

1002 checkout_ref = commit 

1003 repo.refs[branch] = checkout_ref 

1004 else: 

1005 # Default to current HEAD 

1006 checkout_ref = repo.head() 

1007 detach = True 

1008 

1009 # Create the worktree directory 

1010 os.makedirs(path, exist_ok=exist_ok) 

1011 

1012 # Initialize the worktree 

1013 identifier = os.path.basename(path) 

1014 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier) 

1015 

1016 # Set HEAD appropriately 

1017 if detach: 

1018 # Detached HEAD - write SHA directly to HEAD 

1019 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

1020 f.write(checkout_ref + b"\n") 

1021 else: 

1022 # Point to branch 

1023 assert branch is not None # Should be guaranteed by logic above 

1024 from dulwich.refs import HEADREF 

1025 

1026 wt_repo.refs.set_symbolic_ref(HEADREF, branch) 

1027 

1028 # Reset index to match HEAD 

1029 wt_repo.get_worktree().reset_index() 

1030 

1031 return wt_repo 

1032 

1033 

1034def remove_worktree( 

1035 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False 

1036) -> None: 

1037 """Remove a worktree. 

1038 

1039 Args: 

1040 repo: The main repository 

1041 path: Path to the worktree to remove 

1042 force: Force removal even if there are local changes 

1043 

1044 Raises: 

1045 ValueError: If the worktree doesn't exist, has local changes, or is locked 

1046 """ 

1047 path = os.fspath(path) 

1048 if isinstance(path, bytes): 

1049 path = os.fsdecode(path) 

1050 

1051 # Don't allow removing the main worktree 

1052 if os.path.abspath(path) == os.path.abspath(repo.path): 

1053 raise ValueError("Cannot remove the main working tree") 

1054 

1055 # Find the worktree 

1056 worktree_found = False 

1057 worktree_id = None 

1058 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1059 

1060 if os.path.isdir(worktrees_dir): 

1061 for entry in os.listdir(worktrees_dir): 

1062 worktree_path = os.path.join(worktrees_dir, entry) 

1063 gitdir_path = os.path.join(worktree_path, GITDIR) 

1064 

1065 try: 

1066 with open(gitdir_path, "rb") as f: 

1067 gitdir_contents = f.read().strip() 

1068 wt_path = os.fsdecode(gitdir_contents) 

1069 if not os.path.isabs(wt_path): 

1070 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1071 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1072 

1073 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1074 worktree_found = True 

1075 worktree_id = entry 

1076 break 

1077 except (FileNotFoundError, PermissionError): 

1078 continue 

1079 

1080 if not worktree_found: 

1081 raise ValueError(f"Worktree not found: {path}") 

1082 

1083 assert worktree_id is not None # Should be set if worktree_found is True 

1084 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

1085 

1086 # Check if locked 

1087 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

1088 if not force: 

1089 raise ValueError(f"Worktree is locked: {path}") 

1090 

1091 # Check for local changes if not forcing 

1092 if not force and os.path.exists(path): 

1093 # TODO: Check for uncommitted changes in the worktree 

1094 pass 

1095 

1096 # Remove the working directory 

1097 if os.path.exists(path): 

1098 shutil.rmtree(path) 

1099 

1100 # Remove the administrative files 

1101 shutil.rmtree(worktree_control_dir) 

1102 

1103 

1104def prune_worktrees( 

1105 repo: Repo, expire: int | None = None, dry_run: bool = False 

1106) -> list[str]: 

1107 """Prune worktree administrative files for missing worktrees. 

1108 

1109 Args: 

1110 repo: The main repository 

1111 expire: Only prune worktrees older than this many seconds 

1112 dry_run: Don't actually remove anything, just report what would be removed 

1113 

1114 Returns: 

1115 List of pruned worktree identifiers 

1116 """ 

1117 pruned: list[str] = [] 

1118 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1119 

1120 if not os.path.isdir(worktrees_dir): 

1121 return pruned 

1122 

1123 current_time = time.time() 

1124 

1125 for entry in os.listdir(worktrees_dir): 

1126 worktree_path = os.path.join(worktrees_dir, entry) 

1127 if not os.path.isdir(worktree_path): 

1128 continue 

1129 

1130 # Skip locked worktrees 

1131 if os.path.exists(os.path.join(worktree_path, "locked")): 

1132 continue 

1133 

1134 should_prune = False 

1135 

1136 # Check if gitdir exists and points to valid location 

1137 gitdir_path = os.path.join(worktree_path, GITDIR) 

1138 try: 

1139 with open(gitdir_path, "rb") as f: 

1140 gitdir_contents = f.read().strip() 

1141 wt_path = os.fsdecode(gitdir_contents) 

1142 if not os.path.isabs(wt_path): 

1143 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1144 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1145 

1146 if not os.path.exists(wt_dir): 

1147 should_prune = True 

1148 except (FileNotFoundError, PermissionError): 

1149 should_prune = True 

1150 

1151 # Check expiry time if specified 

1152 if should_prune and expire is not None: 

1153 stat_info = os.stat(worktree_path) 

1154 age = current_time - stat_info.st_mtime 

1155 if age < expire: 

1156 should_prune = False 

1157 

1158 if should_prune: 

1159 pruned.append(entry) 

1160 if not dry_run: 

1161 shutil.rmtree(worktree_path) 

1162 

1163 return pruned 

1164 

1165 

1166def lock_worktree( 

1167 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None 

1168) -> None: 

1169 """Lock a worktree to prevent it from being pruned. 

1170 

1171 Args: 

1172 repo: The main repository 

1173 path: Path to the worktree to lock 

1174 reason: Optional reason for locking 

1175 """ 

1176 worktree_id = _find_worktree_id(repo, path) 

1177 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1178 

1179 lock_path = os.path.join(worktree_control_dir, "locked") 

1180 with open(lock_path, "w") as f: 

1181 if reason: 

1182 f.write(reason) 

1183 

1184 

1185def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

1186 """Unlock a worktree. 

1187 

1188 Args: 

1189 repo: The main repository 

1190 path: Path to the worktree to unlock 

1191 """ 

1192 worktree_id = _find_worktree_id(repo, path) 

1193 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1194 

1195 lock_path = os.path.join(worktree_control_dir, "locked") 

1196 if os.path.exists(lock_path): 

1197 os.remove(lock_path) 

1198 

1199 

1200def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str: 

1201 """Find the worktree identifier for the given path. 

1202 

1203 Args: 

1204 repo: The main repository 

1205 path: Path to the worktree 

1206 

1207 Returns: 

1208 The worktree identifier 

1209 

1210 Raises: 

1211 ValueError: If the worktree is not found 

1212 """ 

1213 path = os.fspath(path) 

1214 if isinstance(path, bytes): 

1215 path = os.fsdecode(path) 

1216 

1217 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1218 

1219 if os.path.isdir(worktrees_dir): 

1220 for entry in os.listdir(worktrees_dir): 

1221 worktree_path = os.path.join(worktrees_dir, entry) 

1222 gitdir_path = os.path.join(worktree_path, GITDIR) 

1223 

1224 try: 

1225 with open(gitdir_path, "rb") as f: 

1226 gitdir_contents = f.read().strip() 

1227 wt_path = os.fsdecode(gitdir_contents) 

1228 if not os.path.isabs(wt_path): 

1229 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1230 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1231 

1232 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1233 return entry 

1234 except (FileNotFoundError, PermissionError): 

1235 continue 

1236 

1237 raise ValueError(f"Worktree not found: {path}") 

1238 

1239 

1240def move_worktree( 

1241 repo: Repo, 

1242 old_path: str | bytes | os.PathLike[str], 

1243 new_path: str | bytes | os.PathLike[str], 

1244) -> None: 

1245 """Move a worktree to a new location. 

1246 

1247 Args: 

1248 repo: The main repository 

1249 old_path: Current path of the worktree 

1250 new_path: New path for the worktree 

1251 

1252 Raises: 

1253 ValueError: If the worktree doesn't exist or new path already exists 

1254 """ 

1255 old_path = os.fspath(old_path) 

1256 new_path = os.fspath(new_path) 

1257 if isinstance(old_path, bytes): 

1258 old_path = os.fsdecode(old_path) 

1259 if isinstance(new_path, bytes): 

1260 new_path = os.fsdecode(new_path) 

1261 

1262 # Don't allow moving the main worktree 

1263 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1264 raise ValueError("Cannot move the main working tree") 

1265 

1266 # Check if new path already exists 

1267 if os.path.exists(new_path): 

1268 raise ValueError(f"Path already exists: {new_path}") 

1269 

1270 # Find the worktree 

1271 worktree_id = _find_worktree_id(repo, old_path) 

1272 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1273 

1274 # Move the actual worktree directory 

1275 shutil.move(old_path, new_path) 

1276 

1277 # Update the gitdir file in the worktree 

1278 gitdir_file = os.path.join(new_path, ".git") 

1279 

1280 # Update the gitdir pointer in the control directory 

1281 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1282 f.write(os.fsencode(gitdir_file) + b"\n") 

1283 

1284 

1285def repair_worktree( 

1286 repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None 

1287) -> list[str]: 

1288 """Repair worktree administrative files. 

1289 

1290 This repairs the connection between worktrees and the main repository 

1291 when they have been moved or become corrupted. 

1292 

1293 Args: 

1294 repo: The main repository 

1295 paths: Optional list of worktree paths to repair. If None, repairs 

1296 connections from the main repository to all linked worktrees. 

1297 

1298 Returns: 

1299 List of repaired worktree paths 

1300 

1301 Raises: 

1302 ValueError: If a specified path is not a valid worktree 

1303 """ 

1304 repaired: list[str] = [] 

1305 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1306 

1307 if paths: 

1308 # Repair specific worktrees 

1309 for path in paths: 

1310 path_str = os.fspath(path) 

1311 if isinstance(path_str, bytes): 

1312 path_str = os.fsdecode(path_str) 

1313 path_str = os.path.abspath(path_str) 

1314 

1315 # Check if this is a linked worktree 

1316 gitdir_file = os.path.join(path_str, ".git") 

1317 if not os.path.exists(gitdir_file): 

1318 raise ValueError(f"Not a valid worktree: {path_str}") 

1319 

1320 # Read the .git file to get the worktree control directory 

1321 try: 

1322 with open(gitdir_file, "rb") as f: 

1323 gitdir_content = f.read().strip() 

1324 if gitdir_content.startswith(b"gitdir: "): 

1325 worktree_control_path = gitdir_content[8:].decode() 

1326 else: 

1327 raise ValueError(f"Invalid .git file in worktree: {path_str}") 

1328 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e: 

1329 raise ValueError( 

1330 f"Cannot read .git file in worktree: {path_str}" 

1331 ) from e 

1332 

1333 # Make the path absolute if it's relative 

1334 if not os.path.isabs(worktree_control_path): 

1335 worktree_control_path = os.path.abspath( 

1336 os.path.join(path_str, worktree_control_path) 

1337 ) 

1338 

1339 # Update the gitdir file in the worktree control directory 

1340 gitdir_pointer = os.path.join(worktree_control_path, GITDIR) 

1341 if os.path.exists(gitdir_pointer): 

1342 # Update to point to the current location 

1343 with open(gitdir_pointer, "wb") as f: 

1344 f.write(os.fsencode(gitdir_file) + b"\n") 

1345 repaired.append(path_str) 

1346 else: 

1347 # Repair from main repository to all linked worktrees 

1348 if not os.path.isdir(worktrees_dir): 

1349 return repaired 

1350 

1351 for entry in os.listdir(worktrees_dir): 

1352 worktree_control_path = os.path.join(worktrees_dir, entry) 

1353 if not os.path.isdir(worktree_control_path): 

1354 continue 

1355 

1356 # Read the gitdir file to find where the worktree thinks it is 

1357 gitdir_path = os.path.join(worktree_control_path, GITDIR) 

1358 try: 

1359 with open(gitdir_path, "rb") as f: 

1360 gitdir_contents = f.read().strip() 

1361 old_gitdir_location = os.fsdecode(gitdir_contents) 

1362 except (FileNotFoundError, PermissionError): 

1363 # Can't repair if we can't read the gitdir file 

1364 continue 

1365 

1366 # Get the worktree directory (remove .git suffix) 

1367 old_worktree_path = os.path.dirname(old_gitdir_location) 

1368 

1369 # Check if the .git file exists at the old location 

1370 if os.path.exists(old_gitdir_location): 

1371 # Try to read and update the .git file to ensure it points back correctly 

1372 try: 

1373 with open(old_gitdir_location, "rb") as f: 

1374 content = f.read().strip() 

1375 if content.startswith(b"gitdir: "): 

1376 current_pointer = content[8:].decode() 

1377 if not os.path.isabs(current_pointer): 

1378 current_pointer = os.path.abspath( 

1379 os.path.join(old_worktree_path, current_pointer) 

1380 ) 

1381 

1382 # If it doesn't point to the right place, fix it 

1383 expected_pointer = worktree_control_path 

1384 if os.path.abspath(current_pointer) != os.path.abspath( 

1385 expected_pointer 

1386 ): 

1387 # Update the .git file to point to the correct location 

1388 with open(old_gitdir_location, "wb") as wf: 

1389 wf.write( 

1390 b"gitdir: " 

1391 + os.fsencode(worktree_control_path) 

1392 + b"\n" 

1393 ) 

1394 repaired.append(old_worktree_path) 

1395 except (PermissionError, UnicodeDecodeError): 

1396 continue 

1397 

1398 return repaired 

1399 

1400 

1401@contextmanager 

1402def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1403 """Create a temporary worktree that is automatically cleaned up. 

1404 

1405 Args: 

1406 repo: Dulwich repository object 

1407 prefix: Prefix for the temporary directory name 

1408 

1409 Yields: 

1410 Worktree object 

1411 """ 

1412 temp_dir = None 

1413 worktree = None 

1414 

1415 try: 

1416 # Create temporary directory 

1417 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1418 

1419 # Add worktree 

1420 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1421 

1422 yield worktree 

1423 

1424 finally: 

1425 # Clean up worktree registration 

1426 if worktree: 

1427 repo.worktrees.remove(worktree.path) 

1428 

1429 # Clean up temporary directory 

1430 if temp_dir and Path(temp_dir).exists(): 

1431 shutil.rmtree(temp_dir)