Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

618 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26import builtins 

27import os 

28import shutil 

29import stat 

30import sys 

31import tempfile 

32import time 

33import warnings 

34from collections.abc import Iterable, Iterator, Sequence 

35from contextlib import contextmanager 

36from pathlib import Path 

37from typing import Any, Callable, Union 

38 

39from .errors import CommitError, HookError 

40from .objects import Blob, Commit, ObjectID, Tag, Tree 

41from .refs import SYMREF, Ref 

42from .repo import ( 

43 GITDIR, 

44 WORKTREES, 

45 Repo, 

46 check_user_identity, 

47 get_user_identity, 

48) 

49 

50 

51class WorkTreeInfo: 

52 """Information about a single worktree. 

53 

54 Attributes: 

55 path: Path to the worktree 

56 head: Current HEAD commit SHA 

57 branch: Current branch (if not detached) 

58 bare: Whether this is a bare repository 

59 detached: Whether HEAD is detached 

60 locked: Whether the worktree is locked 

61 prunable: Whether the worktree can be pruned 

62 lock_reason: Reason for locking (if locked) 

63 """ 

64 

65 def __init__( 

66 self, 

67 path: str, 

68 head: bytes | None = None, 

69 branch: bytes | None = None, 

70 bare: bool = False, 

71 detached: bool = False, 

72 locked: bool = False, 

73 prunable: bool = False, 

74 lock_reason: str | None = None, 

75 ): 

76 """Initialize WorkTreeInfo. 

77 

78 Args: 

79 path: Path to the worktree 

80 head: Current HEAD commit SHA 

81 branch: Current branch (if not detached) 

82 bare: Whether this is a bare repository 

83 detached: Whether HEAD is detached 

84 locked: Whether the worktree is locked 

85 prunable: Whether the worktree can be pruned 

86 lock_reason: Reason for locking (if locked) 

87 """ 

88 self.path = path 

89 self.head = head 

90 self.branch = branch 

91 self.bare = bare 

92 self.detached = detached 

93 self.locked = locked 

94 self.prunable = prunable 

95 self.lock_reason = lock_reason 

96 

97 def __repr__(self) -> str: 

98 """Return string representation of WorkTreeInfo.""" 

99 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

100 

101 def __eq__(self, other: object) -> bool: 

102 """Check equality with another WorkTreeInfo.""" 

103 if not isinstance(other, WorkTreeInfo): 

104 return NotImplemented 

105 return ( 

106 self.path == other.path 

107 and self.head == other.head 

108 and self.branch == other.branch 

109 and self.bare == other.bare 

110 and self.detached == other.detached 

111 and self.locked == other.locked 

112 and self.prunable == other.prunable 

113 and self.lock_reason == other.lock_reason 

114 ) 

115 

116 def open(self) -> WorkTree: 

117 """Open this worktree as a WorkTree. 

118 

119 Returns: 

120 WorkTree object for this worktree 

121 

122 Raises: 

123 NotGitRepository: If the worktree path is invalid 

124 """ 

125 from .repo import Repo 

126 

127 repo = Repo(self.path) 

128 return WorkTree(repo, self.path) 

129 

130 

131class WorkTreeContainer: 

132 """Container for managing multiple working trees. 

133 

134 This class manages worktrees for a repository, similar to how 

135 RefsContainer manages references. 

136 """ 

137 

138 def __init__(self, repo: Repo) -> None: 

139 """Initialize a WorkTreeContainer for the given repository. 

140 

141 Args: 

142 repo: The repository this container belongs to 

143 """ 

144 self._repo = repo 

145 

146 def list(self) -> list[WorkTreeInfo]: 

147 """List all worktrees for this repository. 

148 

149 Returns: 

150 A list of WorkTreeInfo objects 

151 """ 

152 return list_worktrees(self._repo) 

153 

154 def add( 

155 self, 

156 path: str | bytes | os.PathLike[str], 

157 branch: str | bytes | None = None, 

158 commit: ObjectID | None = None, 

159 force: bool = False, 

160 detach: bool = False, 

161 exist_ok: bool = False, 

162 ) -> Repo: 

163 """Add a new worktree. 

164 

165 Args: 

166 path: Path where the new worktree should be created 

167 branch: Branch to checkout in the new worktree 

168 commit: Specific commit to checkout (results in detached HEAD) 

169 force: Force creation even if branch is already checked out elsewhere 

170 detach: Detach HEAD in the new worktree 

171 exist_ok: If True, do not raise an error if the directory already exists 

172 

173 Returns: 

174 The newly created worktree repository 

175 """ 

176 return add_worktree( 

177 self._repo, 

178 path, 

179 branch=branch, 

180 commit=commit, 

181 force=force, 

182 detach=detach, 

183 exist_ok=exist_ok, 

184 ) 

185 

186 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None: 

187 """Remove a worktree. 

188 

189 Args: 

190 path: Path to the worktree to remove 

191 force: Force removal even if there are local changes 

192 """ 

193 remove_worktree(self._repo, path, force=force) 

194 

195 def prune( 

196 self, expire: int | None = None, dry_run: bool = False 

197 ) -> builtins.list[str]: 

198 """Prune worktree administrative files for missing worktrees. 

199 

200 Args: 

201 expire: Only prune worktrees older than this many seconds 

202 dry_run: Don't actually remove anything, just report what would be removed 

203 

204 Returns: 

205 List of pruned worktree identifiers 

206 """ 

207 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

208 

209 def move( 

210 self, 

211 old_path: str | bytes | os.PathLike[str], 

212 new_path: str | bytes | os.PathLike[str], 

213 ) -> None: 

214 """Move a worktree to a new location. 

215 

216 Args: 

217 old_path: Current path of the worktree 

218 new_path: New path for the worktree 

219 """ 

220 move_worktree(self._repo, old_path, new_path) 

221 

222 def lock( 

223 self, path: str | bytes | os.PathLike[str], reason: str | None = None 

224 ) -> None: 

225 """Lock a worktree to prevent it from being pruned. 

226 

227 Args: 

228 path: Path to the worktree to lock 

229 reason: Optional reason for locking 

230 """ 

231 lock_worktree(self._repo, path, reason=reason) 

232 

233 def unlock(self, path: str | bytes | os.PathLike[str]) -> None: 

234 """Unlock a worktree. 

235 

236 Args: 

237 path: Path to the worktree to unlock 

238 """ 

239 unlock_worktree(self._repo, path) 

240 

241 def repair( 

242 self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None 

243 ) -> builtins.list[str]: 

244 """Repair worktree administrative files. 

245 

246 Args: 

247 paths: Optional list of worktree paths to repair. If None, repairs 

248 connections from the main repository to all linked worktrees. 

249 

250 Returns: 

251 List of repaired worktree paths 

252 """ 

253 return repair_worktree(self._repo, paths=paths) 

254 

255 def __iter__(self) -> Iterator[WorkTreeInfo]: 

256 """Iterate over all worktrees.""" 

257 yield from self.list() 

258 

259 

260class WorkTree: 

261 """Working tree operations for a Git repository. 

262 

263 This class provides methods for working with the working tree, 

264 such as staging files, committing changes, and resetting the index. 

265 """ 

266 

267 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

268 """Initialize a WorkTree for the given repository. 

269 

270 Args: 

271 repo: The repository this working tree belongs to 

272 path: Path to the working tree directory 

273 """ 

274 self._repo = repo 

275 raw_path = os.fspath(path) 

276 if isinstance(raw_path, bytes): 

277 self.path: str = os.fsdecode(raw_path) 

278 else: 

279 self.path = raw_path 

280 self.path = os.path.abspath(self.path) 

281 

282 def stage( 

283 self, 

284 fs_paths: str 

285 | bytes 

286 | os.PathLike[str] 

287 | Iterable[str | bytes | os.PathLike[str]], 

288 ) -> None: 

289 """Stage a set of paths. 

290 

291 Args: 

292 fs_paths: List of paths, relative to the repository path 

293 """ 

294 root_path_bytes = os.fsencode(self.path) 

295 

296 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

297 fs_paths = [fs_paths] 

298 fs_paths = list(fs_paths) 

299 

300 from .index import ( 

301 _fs_to_tree_path, 

302 blob_from_path_and_stat, 

303 index_entry_from_directory, 

304 index_entry_from_stat, 

305 ) 

306 

307 index = self._repo.open_index() 

308 blob_normalizer = self._repo.get_blob_normalizer() 

309 for fs_path in fs_paths: 

310 if not isinstance(fs_path, bytes): 

311 fs_path = os.fsencode(fs_path) 

312 if os.path.isabs(fs_path): 

313 raise ValueError( 

314 f"path {fs_path!r} should be relative to " 

315 "repository root, not absolute" 

316 ) 

317 tree_path = _fs_to_tree_path(fs_path) 

318 full_path = os.path.join(root_path_bytes, fs_path) 

319 try: 

320 st = os.lstat(full_path) 

321 except (FileNotFoundError, NotADirectoryError): 

322 # File no longer exists 

323 try: 

324 del index[tree_path] 

325 except KeyError: 

326 pass # already removed 

327 else: 

328 if stat.S_ISDIR(st.st_mode): 

329 entry = index_entry_from_directory(st, full_path) 

330 if entry: 

331 index[tree_path] = entry 

332 else: 

333 try: 

334 del index[tree_path] 

335 except KeyError: 

336 pass 

337 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

338 try: 

339 del index[tree_path] 

340 except KeyError: 

341 pass 

342 else: 

343 blob = blob_from_path_and_stat(full_path, st) 

344 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

345 self._repo.object_store.add_object(blob) 

346 index[tree_path] = index_entry_from_stat(st, blob.id) 

347 index.write() 

348 

349 def unstage(self, fs_paths: Sequence[str]) -> None: 

350 """Unstage specific file in the index. 

351 

352 Args: 

353 fs_paths: a list of files to unstage, 

354 relative to the repository path. 

355 """ 

356 from .index import IndexEntry, _fs_to_tree_path 

357 

358 index = self._repo.open_index() 

359 try: 

360 commit = self._repo[b"HEAD"] 

361 except KeyError: 

362 # no head mean no commit in the repo 

363 for fs_path in fs_paths: 

364 tree_path = _fs_to_tree_path(fs_path) 

365 del index[tree_path] 

366 index.write() 

367 return 

368 else: 

369 assert isinstance(commit, Commit), "HEAD must be a commit" 

370 tree_id = commit.tree 

371 

372 for fs_path in fs_paths: 

373 tree_path = _fs_to_tree_path(fs_path) 

374 try: 

375 tree = self._repo.object_store[tree_id] 

376 assert isinstance(tree, Tree) 

377 tree_entry = tree.lookup_path( 

378 self._repo.object_store.__getitem__, tree_path 

379 ) 

380 except KeyError: 

381 # if tree_entry didn't exist, this file was being added, so 

382 # remove index entry 

383 try: 

384 del index[tree_path] 

385 continue 

386 except KeyError as exc: 

387 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

388 

389 st = None 

390 try: 

391 st = os.lstat(os.path.join(self.path, fs_path)) 

392 except FileNotFoundError: 

393 pass 

394 

395 blob_obj = self._repo[tree_entry[1]] 

396 assert isinstance(blob_obj, Blob) 

397 blob_size = len(blob_obj.data) 

398 

399 index_entry = IndexEntry( 

400 ctime=(commit.commit_time, 0), 

401 mtime=(commit.commit_time, 0), 

402 dev=st.st_dev if st else 0, 

403 ino=st.st_ino if st else 0, 

404 mode=tree_entry[0], 

405 uid=st.st_uid if st else 0, 

406 gid=st.st_gid if st else 0, 

407 size=blob_size, 

408 sha=tree_entry[1], 

409 flags=0, 

410 extended_flags=0, 

411 ) 

412 

413 index[tree_path] = index_entry 

414 index.write() 

415 

416 def commit( 

417 self, 

418 message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None, 

419 committer: bytes | None = None, 

420 author: bytes | None = None, 

421 commit_timestamp: float | None = None, 

422 commit_timezone: int | None = None, 

423 author_timestamp: float | None = None, 

424 author_timezone: int | None = None, 

425 tree: ObjectID | None = None, 

426 encoding: bytes | None = None, 

427 ref: Ref | None = b"HEAD", 

428 merge_heads: Sequence[ObjectID] | None = None, 

429 no_verify: bool = False, 

430 sign: bool | None = None, 

431 ) -> ObjectID: 

432 """Create a new commit. 

433 

434 If not specified, committer and author default to 

435 get_user_identity(..., 'COMMITTER') 

436 and get_user_identity(..., 'AUTHOR') respectively. 

437 

438 Args: 

439 message: Commit message (bytes or callable that takes (repo, commit) 

440 and returns bytes) 

441 committer: Committer fullname 

442 author: Author fullname 

443 commit_timestamp: Commit timestamp (defaults to now) 

444 commit_timezone: Commit timestamp timezone (defaults to GMT) 

445 author_timestamp: Author timestamp (defaults to commit 

446 timestamp) 

447 author_timezone: Author timestamp timezone 

448 (defaults to commit timestamp timezone) 

449 tree: SHA1 of the tree root to use (if not specified the 

450 current index will be committed). 

451 encoding: Encoding 

452 ref: Optional ref to commit to (defaults to current branch). 

453 If None, creates a dangling commit without updating any ref. 

454 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

455 no_verify: Skip pre-commit and commit-msg hooks 

456 sign: GPG Sign the commit (bool, defaults to False, 

457 pass True to use default GPG key, 

458 pass a str containing Key ID to use a specific GPG key) 

459 

460 Returns: 

461 New commit SHA1 

462 """ 

463 try: 

464 if not no_verify: 

465 self._repo.hooks["pre-commit"].execute() 

466 except HookError as exc: 

467 raise CommitError(exc) from exc 

468 except KeyError: # no hook defined, silent fallthrough 

469 pass 

470 

471 c = Commit() 

472 if tree is None: 

473 index = self._repo.open_index() 

474 c.tree = index.commit(self._repo.object_store) 

475 else: 

476 if len(tree) != 40: 

477 raise ValueError("tree must be a 40-byte hex sha string") 

478 c.tree = tree 

479 

480 config = self._repo.get_config_stack() 

481 if merge_heads is None: 

482 merge_heads = self._repo._read_heads("MERGE_HEAD") 

483 if committer is None: 

484 committer = get_user_identity(config, kind="COMMITTER") 

485 check_user_identity(committer) 

486 c.committer = committer 

487 if commit_timestamp is None: 

488 # FIXME: Support GIT_COMMITTER_DATE environment variable 

489 commit_timestamp = time.time() 

490 c.commit_time = int(commit_timestamp) 

491 if commit_timezone is None: 

492 # FIXME: Use current user timezone rather than UTC 

493 commit_timezone = 0 

494 c.commit_timezone = commit_timezone 

495 if author is None: 

496 author = get_user_identity(config, kind="AUTHOR") 

497 c.author = author 

498 check_user_identity(author) 

499 if author_timestamp is None: 

500 # FIXME: Support GIT_AUTHOR_DATE environment variable 

501 author_timestamp = commit_timestamp 

502 c.author_time = int(author_timestamp) 

503 if author_timezone is None: 

504 author_timezone = commit_timezone 

505 c.author_timezone = author_timezone 

506 if encoding is None: 

507 try: 

508 encoding = config.get(("i18n",), "commitEncoding") 

509 except KeyError: 

510 pass # No dice 

511 if encoding is not None: 

512 c.encoding = encoding 

513 # Store original message (might be callable) 

514 original_message = message 

515 message = None # Will be set later after parents are set 

516 

517 # Check if we should sign the commit 

518 if sign is None: 

519 # Check commit.gpgSign configuration when sign is not explicitly set 

520 try: 

521 should_sign = config.get_boolean( 

522 (b"commit",), b"gpgsign", default=False 

523 ) 

524 except KeyError: 

525 should_sign = False # Default to not signing if no config 

526 else: 

527 should_sign = sign 

528 

529 # Get the signing key from config if signing is enabled 

530 keyid = None 

531 if should_sign: 

532 try: 

533 keyid_bytes = config.get((b"user",), b"signingkey") 

534 keyid = keyid_bytes.decode() if keyid_bytes else None 

535 except KeyError: 

536 keyid = None 

537 

538 if ref is None: 

539 # Create a dangling commit 

540 c.parents = merge_heads 

541 else: 

542 try: 

543 old_head = self._repo.refs[ref] 

544 c.parents = [old_head, *merge_heads] 

545 except KeyError: 

546 c.parents = merge_heads 

547 

548 # Handle message after parents are set 

549 if callable(original_message): 

550 message = original_message(self._repo, c) 

551 if message is None: 

552 raise ValueError("Message callback returned None") 

553 else: 

554 message = original_message 

555 

556 if message is None: 

557 # FIXME: Try to read commit message from .git/MERGE_MSG 

558 raise ValueError("No commit message specified") 

559 

560 try: 

561 if no_verify: 

562 c.message = message 

563 else: 

564 c.message = self._repo.hooks["commit-msg"].execute(message) 

565 if c.message is None: 

566 c.message = message 

567 except HookError as exc: 

568 raise CommitError(exc) from exc 

569 except KeyError: # no hook defined, message not modified 

570 c.message = message 

571 

572 if ref is None: 

573 # Create a dangling commit 

574 if should_sign: 

575 c.sign(keyid) 

576 self._repo.object_store.add_object(c) 

577 else: 

578 try: 

579 old_head = self._repo.refs[ref] 

580 if should_sign: 

581 c.sign(keyid) 

582 self._repo.object_store.add_object(c) 

583 message_bytes = ( 

584 message.encode() if isinstance(message, str) else message 

585 ) 

586 ok = self._repo.refs.set_if_equals( 

587 ref, 

588 old_head, 

589 c.id, 

590 message=b"commit: " + message_bytes, 

591 committer=committer, 

592 timestamp=int(commit_timestamp) 

593 if commit_timestamp is not None 

594 else None, 

595 timezone=commit_timezone, 

596 ) 

597 except KeyError: 

598 c.parents = merge_heads 

599 if should_sign: 

600 c.sign(keyid) 

601 self._repo.object_store.add_object(c) 

602 message_bytes = ( 

603 message.encode() if isinstance(message, str) else message 

604 ) 

605 ok = self._repo.refs.add_if_new( 

606 ref, 

607 c.id, 

608 message=b"commit: " + message_bytes, 

609 committer=committer, 

610 timestamp=int(commit_timestamp) 

611 if commit_timestamp is not None 

612 else None, 

613 timezone=commit_timezone, 

614 ) 

615 if not ok: 

616 # Fail if the atomic compare-and-swap failed, leaving the 

617 # commit and all its objects as garbage. 

618 raise CommitError(f"{ref!r} changed during commit") 

619 

620 self._repo._del_named_file("MERGE_HEAD") 

621 

622 try: 

623 self._repo.hooks["post-commit"].execute() 

624 except HookError as e: # silent failure 

625 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

626 except KeyError: # no hook defined, silent fallthrough 

627 pass 

628 

629 # Trigger auto GC if needed 

630 from .gc import maybe_auto_gc 

631 

632 maybe_auto_gc(self._repo) 

633 

634 return c.id 

635 

636 def reset_index(self, tree: bytes | None = None) -> None: 

637 """Reset the index back to a specific tree. 

638 

639 Args: 

640 tree: Tree SHA to reset to, None for current HEAD tree. 

641 """ 

642 from .index import ( 

643 build_index_from_tree, 

644 symlink, 

645 validate_path_element_default, 

646 validate_path_element_hfs, 

647 validate_path_element_ntfs, 

648 ) 

649 

650 if tree is None: 

651 head = self._repo[b"HEAD"] 

652 if isinstance(head, Tag): 

653 _cls, obj = head.object 

654 head = self._repo.get_object(obj) 

655 from .objects import Commit 

656 

657 assert isinstance(head, Commit) 

658 tree = head.tree 

659 config = self._repo.get_config() 

660 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

661 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

662 validate_path_element = validate_path_element_ntfs 

663 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

664 validate_path_element = validate_path_element_hfs 

665 else: 

666 validate_path_element = validate_path_element_default 

667 if config.get_boolean(b"core", b"symlinks", True): 

668 symlink_fn = symlink 

669 else: 

670 

671 def symlink_fn( # type: ignore[misc,unused-ignore] 

672 src: Union[str, bytes], 

673 dst: Union[str, bytes], 

674 target_is_directory: bool = False, 

675 *, 

676 dir_fd: int | None = None, 

677 ) -> None: 

678 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f: 

679 f.write(src) 

680 

681 blob_normalizer = self._repo.get_blob_normalizer() 

682 return build_index_from_tree( 

683 self.path, 

684 self._repo.index_path(), 

685 self._repo.object_store, 

686 tree, 

687 honor_filemode=honor_filemode, 

688 validate_path_element=validate_path_element, 

689 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore] 

690 blob_normalizer=blob_normalizer, 

691 ) 

692 

693 def _sparse_checkout_file_path(self) -> str: 

694 """Return the path of the sparse-checkout file in this repo's control dir.""" 

695 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

696 

697 def configure_for_cone_mode(self) -> None: 

698 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

699 config = self._repo.get_config() 

700 config.set((b"core",), b"sparseCheckout", b"true") 

701 config.set((b"core",), b"sparseCheckoutCone", b"true") 

702 config.write_to_path() 

703 

704 def infer_cone_mode(self) -> bool: 

705 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

706 config = self._repo.get_config() 

707 try: 

708 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

709 return sc_cone == b"true" 

710 except KeyError: 

711 # If core.sparseCheckoutCone is not set, default to False 

712 return False 

713 

714 def get_sparse_checkout_patterns(self) -> list[str]: 

715 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

716 

717 Returns: 

718 A list of patterns. Returns an empty list if the file is missing. 

719 """ 

720 path = self._sparse_checkout_file_path() 

721 try: 

722 with open(path, encoding="utf-8") as f: 

723 return [line.strip() for line in f if line.strip()] 

724 except FileNotFoundError: 

725 return [] 

726 

727 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

728 """Write the given sparse-checkout patterns into info/sparse-checkout. 

729 

730 Creates the info/ directory if it does not exist. 

731 

732 Args: 

733 patterns: A list of gitignore-style patterns to store. 

734 """ 

735 info_dir = os.path.join(self._repo.controldir(), "info") 

736 os.makedirs(info_dir, exist_ok=True) 

737 

738 path = self._sparse_checkout_file_path() 

739 with open(path, "w", encoding="utf-8") as f: 

740 for pat in patterns: 

741 f.write(pat + "\n") 

742 

743 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

744 """Write the given cone-mode directory patterns into info/sparse-checkout. 

745 

746 For each directory to include, add an inclusion line that "undoes" the prior 

747 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

748 Never add the same line twice. 

749 """ 

750 patterns = ["/*", "!/*/"] 

751 if dirs: 

752 for d in dirs: 

753 d = d.strip("/") 

754 line = f"/{d}/" 

755 if d and line not in patterns: 

756 patterns.append(line) 

757 self.set_sparse_checkout_patterns(patterns) 

758 

759 

760def read_worktree_lock_reason(worktree_path: str) -> str | None: 

761 """Read the lock reason for a worktree. 

762 

763 Args: 

764 worktree_path: Path to the worktree's administrative directory 

765 

766 Returns: 

767 The lock reason if the worktree is locked, None otherwise 

768 """ 

769 locked_path = os.path.join(worktree_path, "locked") 

770 if not os.path.exists(locked_path): 

771 return None 

772 

773 try: 

774 with open(locked_path) as f: 

775 return f.read().strip() 

776 except (FileNotFoundError, PermissionError): 

777 return None 

778 

779 

780def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

781 """List all worktrees for the given repository. 

782 

783 Args: 

784 repo: The repository to list worktrees for 

785 

786 Returns: 

787 A list of WorkTreeInfo objects 

788 """ 

789 worktrees = [] 

790 

791 # Add main worktree 

792 main_wt_info = WorkTreeInfo( 

793 path=repo.path, 

794 head=repo.head(), 

795 bare=repo.bare, 

796 detached=False, 

797 locked=False, 

798 prunable=False, 

799 ) 

800 

801 # Get branch info for main worktree 

802 try: 

803 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

804 head_contents = f.read().strip() 

805 if head_contents.startswith(SYMREF): 

806 ref_name = head_contents[len(SYMREF) :].strip() 

807 main_wt_info.branch = ref_name 

808 else: 

809 main_wt_info.detached = True 

810 main_wt_info.branch = None 

811 except (FileNotFoundError, PermissionError): 

812 main_wt_info.branch = None 

813 main_wt_info.detached = True 

814 

815 worktrees.append(main_wt_info) 

816 

817 # List additional worktrees 

818 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

819 if os.path.isdir(worktrees_dir): 

820 for entry in os.listdir(worktrees_dir): 

821 worktree_path = os.path.join(worktrees_dir, entry) 

822 if not os.path.isdir(worktree_path): 

823 continue 

824 

825 wt_info = WorkTreeInfo( 

826 path="", # Will be set below 

827 bare=False, 

828 detached=False, 

829 locked=False, 

830 prunable=False, 

831 ) 

832 

833 # Read gitdir to get actual worktree path 

834 gitdir_path = os.path.join(worktree_path, GITDIR) 

835 try: 

836 with open(gitdir_path, "rb") as f: 

837 gitdir_contents = f.read().strip() 

838 # Convert relative path to absolute if needed 

839 wt_path = os.fsdecode(gitdir_contents) 

840 if not os.path.isabs(wt_path): 

841 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

842 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

843 except (FileNotFoundError, PermissionError): 

844 # Worktree directory is missing, skip it 

845 # TODO: Consider adding these as prunable worktrees with a placeholder path 

846 continue 

847 

848 # Check if worktree path exists 

849 if wt_info.path and not os.path.exists(wt_info.path): 

850 wt_info.prunable = True 

851 

852 # Read HEAD 

853 head_path = os.path.join(worktree_path, "HEAD") 

854 try: 

855 with open(head_path, "rb") as f: 

856 head_contents = f.read().strip() 

857 if head_contents.startswith(SYMREF): 

858 ref_name = head_contents[len(SYMREF) :].strip() 

859 wt_info.branch = ref_name 

860 # Resolve ref to get commit sha 

861 try: 

862 wt_info.head = repo.refs[ref_name] 

863 except KeyError: 

864 wt_info.head = None 

865 else: 

866 wt_info.detached = True 

867 wt_info.branch = None 

868 wt_info.head = head_contents 

869 except (FileNotFoundError, PermissionError): 

870 wt_info.head = None 

871 wt_info.branch = None 

872 

873 # Check if locked 

874 lock_reason = read_worktree_lock_reason(worktree_path) 

875 if lock_reason is not None: 

876 wt_info.locked = True 

877 wt_info.lock_reason = lock_reason 

878 

879 worktrees.append(wt_info) 

880 

881 return worktrees 

882 

883 

884def add_worktree( 

885 repo: Repo, 

886 path: str | bytes | os.PathLike[str], 

887 branch: str | bytes | None = None, 

888 commit: ObjectID | None = None, 

889 force: bool = False, 

890 detach: bool = False, 

891 exist_ok: bool = False, 

892) -> Repo: 

893 """Add a new worktree to the repository. 

894 

895 Args: 

896 repo: The main repository 

897 path: Path where the new worktree should be created 

898 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

899 commit: Specific commit to checkout (results in detached HEAD) 

900 force: Force creation even if branch is already checked out elsewhere 

901 detach: Detach HEAD in the new worktree 

902 exist_ok: If True, do not raise an error if the directory already exists 

903 

904 Returns: 

905 The newly created worktree repository 

906 

907 Raises: 

908 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

909 """ 

910 from .repo import Repo as RepoClass 

911 

912 path = os.fspath(path) 

913 if isinstance(path, bytes): 

914 path = os.fsdecode(path) 

915 

916 # Check if path already exists 

917 if os.path.exists(path) and not exist_ok: 

918 raise ValueError(f"Path already exists: {path}") 

919 

920 # Normalize branch name 

921 if branch is not None: 

922 if isinstance(branch, str): 

923 branch = branch.encode() 

924 if not branch.startswith(b"refs/heads/"): 

925 branch = b"refs/heads/" + branch 

926 

927 # Check if branch is already checked out in another worktree 

928 if branch and not force: 

929 for wt in list_worktrees(repo): 

930 if wt.branch == branch: 

931 raise ValueError( 

932 f"Branch {branch.decode()} is already checked out at {wt.path}" 

933 ) 

934 

935 # Determine what to checkout 

936 if commit is not None: 

937 checkout_ref = commit 

938 detach = True 

939 elif branch is not None: 

940 # Check if branch exists 

941 try: 

942 checkout_ref = repo.refs[branch] 

943 except KeyError: 

944 if commit is None: 

945 # Create new branch from HEAD 

946 checkout_ref = repo.head() 

947 repo.refs[branch] = checkout_ref 

948 else: 

949 # Create new branch from specified commit 

950 checkout_ref = commit 

951 repo.refs[branch] = checkout_ref 

952 else: 

953 # Default to current HEAD 

954 checkout_ref = repo.head() 

955 detach = True 

956 

957 # Create the worktree directory 

958 os.makedirs(path, exist_ok=exist_ok) 

959 

960 # Initialize the worktree 

961 identifier = os.path.basename(path) 

962 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier) 

963 

964 # Set HEAD appropriately 

965 if detach: 

966 # Detached HEAD - write SHA directly to HEAD 

967 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

968 f.write(checkout_ref + b"\n") 

969 else: 

970 # Point to branch 

971 assert branch is not None # Should be guaranteed by logic above 

972 wt_repo.refs.set_symbolic_ref(b"HEAD", branch) 

973 

974 # Reset index to match HEAD 

975 wt_repo.get_worktree().reset_index() 

976 

977 return wt_repo 

978 

979 

980def remove_worktree( 

981 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False 

982) -> None: 

983 """Remove a worktree. 

984 

985 Args: 

986 repo: The main repository 

987 path: Path to the worktree to remove 

988 force: Force removal even if there are local changes 

989 

990 Raises: 

991 ValueError: If the worktree doesn't exist, has local changes, or is locked 

992 """ 

993 path = os.fspath(path) 

994 if isinstance(path, bytes): 

995 path = os.fsdecode(path) 

996 

997 # Don't allow removing the main worktree 

998 if os.path.abspath(path) == os.path.abspath(repo.path): 

999 raise ValueError("Cannot remove the main working tree") 

1000 

1001 # Find the worktree 

1002 worktree_found = False 

1003 worktree_id = None 

1004 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1005 

1006 if os.path.isdir(worktrees_dir): 

1007 for entry in os.listdir(worktrees_dir): 

1008 worktree_path = os.path.join(worktrees_dir, entry) 

1009 gitdir_path = os.path.join(worktree_path, GITDIR) 

1010 

1011 try: 

1012 with open(gitdir_path, "rb") as f: 

1013 gitdir_contents = f.read().strip() 

1014 wt_path = os.fsdecode(gitdir_contents) 

1015 if not os.path.isabs(wt_path): 

1016 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1017 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1018 

1019 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1020 worktree_found = True 

1021 worktree_id = entry 

1022 break 

1023 except (FileNotFoundError, PermissionError): 

1024 continue 

1025 

1026 if not worktree_found: 

1027 raise ValueError(f"Worktree not found: {path}") 

1028 

1029 assert worktree_id is not None # Should be set if worktree_found is True 

1030 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

1031 

1032 # Check if locked 

1033 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

1034 if not force: 

1035 raise ValueError(f"Worktree is locked: {path}") 

1036 

1037 # Check for local changes if not forcing 

1038 if not force and os.path.exists(path): 

1039 # TODO: Check for uncommitted changes in the worktree 

1040 pass 

1041 

1042 # Remove the working directory 

1043 if os.path.exists(path): 

1044 shutil.rmtree(path) 

1045 

1046 # Remove the administrative files 

1047 shutil.rmtree(worktree_control_dir) 

1048 

1049 

1050def prune_worktrees( 

1051 repo: Repo, expire: int | None = None, dry_run: bool = False 

1052) -> list[str]: 

1053 """Prune worktree administrative files for missing worktrees. 

1054 

1055 Args: 

1056 repo: The main repository 

1057 expire: Only prune worktrees older than this many seconds 

1058 dry_run: Don't actually remove anything, just report what would be removed 

1059 

1060 Returns: 

1061 List of pruned worktree identifiers 

1062 """ 

1063 pruned: list[str] = [] 

1064 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1065 

1066 if not os.path.isdir(worktrees_dir): 

1067 return pruned 

1068 

1069 current_time = time.time() 

1070 

1071 for entry in os.listdir(worktrees_dir): 

1072 worktree_path = os.path.join(worktrees_dir, entry) 

1073 if not os.path.isdir(worktree_path): 

1074 continue 

1075 

1076 # Skip locked worktrees 

1077 if os.path.exists(os.path.join(worktree_path, "locked")): 

1078 continue 

1079 

1080 should_prune = False 

1081 

1082 # Check if gitdir exists and points to valid location 

1083 gitdir_path = os.path.join(worktree_path, GITDIR) 

1084 try: 

1085 with open(gitdir_path, "rb") as f: 

1086 gitdir_contents = f.read().strip() 

1087 wt_path = os.fsdecode(gitdir_contents) 

1088 if not os.path.isabs(wt_path): 

1089 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1090 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1091 

1092 if not os.path.exists(wt_dir): 

1093 should_prune = True 

1094 except (FileNotFoundError, PermissionError): 

1095 should_prune = True 

1096 

1097 # Check expiry time if specified 

1098 if should_prune and expire is not None: 

1099 stat_info = os.stat(worktree_path) 

1100 age = current_time - stat_info.st_mtime 

1101 if age < expire: 

1102 should_prune = False 

1103 

1104 if should_prune: 

1105 pruned.append(entry) 

1106 if not dry_run: 

1107 shutil.rmtree(worktree_path) 

1108 

1109 return pruned 

1110 

1111 

1112def lock_worktree( 

1113 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None 

1114) -> None: 

1115 """Lock a worktree to prevent it from being pruned. 

1116 

1117 Args: 

1118 repo: The main repository 

1119 path: Path to the worktree to lock 

1120 reason: Optional reason for locking 

1121 """ 

1122 worktree_id = _find_worktree_id(repo, path) 

1123 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1124 

1125 lock_path = os.path.join(worktree_control_dir, "locked") 

1126 with open(lock_path, "w") as f: 

1127 if reason: 

1128 f.write(reason) 

1129 

1130 

1131def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

1132 """Unlock a worktree. 

1133 

1134 Args: 

1135 repo: The main repository 

1136 path: Path to the worktree to unlock 

1137 """ 

1138 worktree_id = _find_worktree_id(repo, path) 

1139 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1140 

1141 lock_path = os.path.join(worktree_control_dir, "locked") 

1142 if os.path.exists(lock_path): 

1143 os.remove(lock_path) 

1144 

1145 

1146def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str: 

1147 """Find the worktree identifier for the given path. 

1148 

1149 Args: 

1150 repo: The main repository 

1151 path: Path to the worktree 

1152 

1153 Returns: 

1154 The worktree identifier 

1155 

1156 Raises: 

1157 ValueError: If the worktree is not found 

1158 """ 

1159 path = os.fspath(path) 

1160 if isinstance(path, bytes): 

1161 path = os.fsdecode(path) 

1162 

1163 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1164 

1165 if os.path.isdir(worktrees_dir): 

1166 for entry in os.listdir(worktrees_dir): 

1167 worktree_path = os.path.join(worktrees_dir, entry) 

1168 gitdir_path = os.path.join(worktree_path, GITDIR) 

1169 

1170 try: 

1171 with open(gitdir_path, "rb") as f: 

1172 gitdir_contents = f.read().strip() 

1173 wt_path = os.fsdecode(gitdir_contents) 

1174 if not os.path.isabs(wt_path): 

1175 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1176 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1177 

1178 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1179 return entry 

1180 except (FileNotFoundError, PermissionError): 

1181 continue 

1182 

1183 raise ValueError(f"Worktree not found: {path}") 

1184 

1185 

1186def move_worktree( 

1187 repo: Repo, 

1188 old_path: str | bytes | os.PathLike[str], 

1189 new_path: str | bytes | os.PathLike[str], 

1190) -> None: 

1191 """Move a worktree to a new location. 

1192 

1193 Args: 

1194 repo: The main repository 

1195 old_path: Current path of the worktree 

1196 new_path: New path for the worktree 

1197 

1198 Raises: 

1199 ValueError: If the worktree doesn't exist or new path already exists 

1200 """ 

1201 old_path = os.fspath(old_path) 

1202 new_path = os.fspath(new_path) 

1203 if isinstance(old_path, bytes): 

1204 old_path = os.fsdecode(old_path) 

1205 if isinstance(new_path, bytes): 

1206 new_path = os.fsdecode(new_path) 

1207 

1208 # Don't allow moving the main worktree 

1209 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1210 raise ValueError("Cannot move the main working tree") 

1211 

1212 # Check if new path already exists 

1213 if os.path.exists(new_path): 

1214 raise ValueError(f"Path already exists: {new_path}") 

1215 

1216 # Find the worktree 

1217 worktree_id = _find_worktree_id(repo, old_path) 

1218 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1219 

1220 # Move the actual worktree directory 

1221 shutil.move(old_path, new_path) 

1222 

1223 # Update the gitdir file in the worktree 

1224 gitdir_file = os.path.join(new_path, ".git") 

1225 

1226 # Update the gitdir pointer in the control directory 

1227 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1228 f.write(os.fsencode(gitdir_file) + b"\n") 

1229 

1230 

1231def repair_worktree( 

1232 repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None 

1233) -> list[str]: 

1234 """Repair worktree administrative files. 

1235 

1236 This repairs the connection between worktrees and the main repository 

1237 when they have been moved or become corrupted. 

1238 

1239 Args: 

1240 repo: The main repository 

1241 paths: Optional list of worktree paths to repair. If None, repairs 

1242 connections from the main repository to all linked worktrees. 

1243 

1244 Returns: 

1245 List of repaired worktree paths 

1246 

1247 Raises: 

1248 ValueError: If a specified path is not a valid worktree 

1249 """ 

1250 repaired: list[str] = [] 

1251 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1252 

1253 if paths: 

1254 # Repair specific worktrees 

1255 for path in paths: 

1256 path_str = os.fspath(path) 

1257 if isinstance(path_str, bytes): 

1258 path_str = os.fsdecode(path_str) 

1259 path_str = os.path.abspath(path_str) 

1260 

1261 # Check if this is a linked worktree 

1262 gitdir_file = os.path.join(path_str, ".git") 

1263 if not os.path.exists(gitdir_file): 

1264 raise ValueError(f"Not a valid worktree: {path_str}") 

1265 

1266 # Read the .git file to get the worktree control directory 

1267 try: 

1268 with open(gitdir_file, "rb") as f: 

1269 gitdir_content = f.read().strip() 

1270 if gitdir_content.startswith(b"gitdir: "): 

1271 worktree_control_path = gitdir_content[8:].decode() 

1272 else: 

1273 raise ValueError(f"Invalid .git file in worktree: {path_str}") 

1274 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e: 

1275 raise ValueError( 

1276 f"Cannot read .git file in worktree: {path_str}" 

1277 ) from e 

1278 

1279 # Make the path absolute if it's relative 

1280 if not os.path.isabs(worktree_control_path): 

1281 worktree_control_path = os.path.abspath( 

1282 os.path.join(path_str, worktree_control_path) 

1283 ) 

1284 

1285 # Update the gitdir file in the worktree control directory 

1286 gitdir_pointer = os.path.join(worktree_control_path, GITDIR) 

1287 if os.path.exists(gitdir_pointer): 

1288 # Update to point to the current location 

1289 with open(gitdir_pointer, "wb") as f: 

1290 f.write(os.fsencode(gitdir_file) + b"\n") 

1291 repaired.append(path_str) 

1292 else: 

1293 # Repair from main repository to all linked worktrees 

1294 if not os.path.isdir(worktrees_dir): 

1295 return repaired 

1296 

1297 for entry in os.listdir(worktrees_dir): 

1298 worktree_control_path = os.path.join(worktrees_dir, entry) 

1299 if not os.path.isdir(worktree_control_path): 

1300 continue 

1301 

1302 # Read the gitdir file to find where the worktree thinks it is 

1303 gitdir_path = os.path.join(worktree_control_path, GITDIR) 

1304 try: 

1305 with open(gitdir_path, "rb") as f: 

1306 gitdir_contents = f.read().strip() 

1307 old_gitdir_location = os.fsdecode(gitdir_contents) 

1308 except (FileNotFoundError, PermissionError): 

1309 # Can't repair if we can't read the gitdir file 

1310 continue 

1311 

1312 # Get the worktree directory (remove .git suffix) 

1313 old_worktree_path = os.path.dirname(old_gitdir_location) 

1314 

1315 # Check if the .git file exists at the old location 

1316 if os.path.exists(old_gitdir_location): 

1317 # Try to read and update the .git file to ensure it points back correctly 

1318 try: 

1319 with open(old_gitdir_location, "rb") as f: 

1320 content = f.read().strip() 

1321 if content.startswith(b"gitdir: "): 

1322 current_pointer = content[8:].decode() 

1323 if not os.path.isabs(current_pointer): 

1324 current_pointer = os.path.abspath( 

1325 os.path.join(old_worktree_path, current_pointer) 

1326 ) 

1327 

1328 # If it doesn't point to the right place, fix it 

1329 expected_pointer = worktree_control_path 

1330 if os.path.abspath(current_pointer) != os.path.abspath( 

1331 expected_pointer 

1332 ): 

1333 # Update the .git file to point to the correct location 

1334 with open(old_gitdir_location, "wb") as wf: 

1335 wf.write( 

1336 b"gitdir: " 

1337 + os.fsencode(worktree_control_path) 

1338 + b"\n" 

1339 ) 

1340 repaired.append(old_worktree_path) 

1341 except (PermissionError, UnicodeDecodeError): 

1342 continue 

1343 

1344 return repaired 

1345 

1346 

1347@contextmanager 

1348def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1349 """Create a temporary worktree that is automatically cleaned up. 

1350 

1351 Args: 

1352 repo: Dulwich repository object 

1353 prefix: Prefix for the temporary directory name 

1354 

1355 Yields: 

1356 Worktree object 

1357 """ 

1358 temp_dir = None 

1359 worktree = None 

1360 

1361 try: 

1362 # Create temporary directory 

1363 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1364 

1365 # Add worktree 

1366 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1367 

1368 yield worktree 

1369 

1370 finally: 

1371 # Clean up worktree registration 

1372 if worktree: 

1373 repo.worktrees.remove(worktree.path) 

1374 

1375 # Clean up temporary directory 

1376 if temp_dir and Path(temp_dir).exists(): 

1377 shutil.rmtree(temp_dir)