Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

559 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26import builtins 

27import os 

28import shutil 

29import stat 

30import sys 

31import tempfile 

32import time 

33import warnings 

34from collections.abc import Iterable, Iterator, Sequence 

35from contextlib import contextmanager 

36from pathlib import Path 

37from typing import Any, Callable, Union 

38 

39from .errors import CommitError, HookError 

40from .objects import Blob, Commit, ObjectID, Tag, Tree 

41from .refs import SYMREF, Ref 

42from .repo import ( 

43 GITDIR, 

44 WORKTREES, 

45 Repo, 

46 check_user_identity, 

47 get_user_identity, 

48) 

49 

50 

51class WorkTreeInfo: 

52 """Information about a single worktree. 

53 

54 Attributes: 

55 path: Path to the worktree 

56 head: Current HEAD commit SHA 

57 branch: Current branch (if not detached) 

58 bare: Whether this is a bare repository 

59 detached: Whether HEAD is detached 

60 locked: Whether the worktree is locked 

61 prunable: Whether the worktree can be pruned 

62 lock_reason: Reason for locking (if locked) 

63 """ 

64 

65 def __init__( 

66 self, 

67 path: str, 

68 head: bytes | None = None, 

69 branch: bytes | None = None, 

70 bare: bool = False, 

71 detached: bool = False, 

72 locked: bool = False, 

73 prunable: bool = False, 

74 lock_reason: str | None = None, 

75 ): 

76 """Initialize WorkTreeInfo. 

77 

78 Args: 

79 path: Path to the worktree 

80 head: Current HEAD commit SHA 

81 branch: Current branch (if not detached) 

82 bare: Whether this is a bare repository 

83 detached: Whether HEAD is detached 

84 locked: Whether the worktree is locked 

85 prunable: Whether the worktree can be pruned 

86 lock_reason: Reason for locking (if locked) 

87 """ 

88 self.path = path 

89 self.head = head 

90 self.branch = branch 

91 self.bare = bare 

92 self.detached = detached 

93 self.locked = locked 

94 self.prunable = prunable 

95 self.lock_reason = lock_reason 

96 

97 def __repr__(self) -> str: 

98 """Return string representation of WorkTreeInfo.""" 

99 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

100 

101 def __eq__(self, other: object) -> bool: 

102 """Check equality with another WorkTreeInfo.""" 

103 if not isinstance(other, WorkTreeInfo): 

104 return NotImplemented 

105 return ( 

106 self.path == other.path 

107 and self.head == other.head 

108 and self.branch == other.branch 

109 and self.bare == other.bare 

110 and self.detached == other.detached 

111 and self.locked == other.locked 

112 and self.prunable == other.prunable 

113 and self.lock_reason == other.lock_reason 

114 ) 

115 

116 def open(self) -> WorkTree: 

117 """Open this worktree as a WorkTree. 

118 

119 Returns: 

120 WorkTree object for this worktree 

121 

122 Raises: 

123 NotGitRepository: If the worktree path is invalid 

124 """ 

125 from .repo import Repo 

126 

127 repo = Repo(self.path) 

128 return WorkTree(repo, self.path) 

129 

130 

131class WorkTreeContainer: 

132 """Container for managing multiple working trees. 

133 

134 This class manages worktrees for a repository, similar to how 

135 RefsContainer manages references. 

136 """ 

137 

138 def __init__(self, repo: Repo) -> None: 

139 """Initialize a WorkTreeContainer for the given repository. 

140 

141 Args: 

142 repo: The repository this container belongs to 

143 """ 

144 self._repo = repo 

145 

146 def list(self) -> list[WorkTreeInfo]: 

147 """List all worktrees for this repository. 

148 

149 Returns: 

150 A list of WorkTreeInfo objects 

151 """ 

152 return list_worktrees(self._repo) 

153 

154 def add( 

155 self, 

156 path: str | bytes | os.PathLike[str], 

157 branch: str | bytes | None = None, 

158 commit: ObjectID | None = None, 

159 force: bool = False, 

160 detach: bool = False, 

161 exist_ok: bool = False, 

162 ) -> Repo: 

163 """Add a new worktree. 

164 

165 Args: 

166 path: Path where the new worktree should be created 

167 branch: Branch to checkout in the new worktree 

168 commit: Specific commit to checkout (results in detached HEAD) 

169 force: Force creation even if branch is already checked out elsewhere 

170 detach: Detach HEAD in the new worktree 

171 exist_ok: If True, do not raise an error if the directory already exists 

172 

173 Returns: 

174 The newly created worktree repository 

175 """ 

176 return add_worktree( 

177 self._repo, 

178 path, 

179 branch=branch, 

180 commit=commit, 

181 force=force, 

182 detach=detach, 

183 exist_ok=exist_ok, 

184 ) 

185 

186 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None: 

187 """Remove a worktree. 

188 

189 Args: 

190 path: Path to the worktree to remove 

191 force: Force removal even if there are local changes 

192 """ 

193 remove_worktree(self._repo, path, force=force) 

194 

195 def prune( 

196 self, expire: int | None = None, dry_run: bool = False 

197 ) -> builtins.list[str]: 

198 """Prune worktree administrative files for missing worktrees. 

199 

200 Args: 

201 expire: Only prune worktrees older than this many seconds 

202 dry_run: Don't actually remove anything, just report what would be removed 

203 

204 Returns: 

205 List of pruned worktree identifiers 

206 """ 

207 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

208 

209 def move( 

210 self, 

211 old_path: str | bytes | os.PathLike[str], 

212 new_path: str | bytes | os.PathLike[str], 

213 ) -> None: 

214 """Move a worktree to a new location. 

215 

216 Args: 

217 old_path: Current path of the worktree 

218 new_path: New path for the worktree 

219 """ 

220 move_worktree(self._repo, old_path, new_path) 

221 

222 def lock( 

223 self, path: str | bytes | os.PathLike[str], reason: str | None = None 

224 ) -> None: 

225 """Lock a worktree to prevent it from being pruned. 

226 

227 Args: 

228 path: Path to the worktree to lock 

229 reason: Optional reason for locking 

230 """ 

231 lock_worktree(self._repo, path, reason=reason) 

232 

233 def unlock(self, path: str | bytes | os.PathLike[str]) -> None: 

234 """Unlock a worktree. 

235 

236 Args: 

237 path: Path to the worktree to unlock 

238 """ 

239 unlock_worktree(self._repo, path) 

240 

241 def __iter__(self) -> Iterator[WorkTreeInfo]: 

242 """Iterate over all worktrees.""" 

243 yield from self.list() 

244 

245 

246class WorkTree: 

247 """Working tree operations for a Git repository. 

248 

249 This class provides methods for working with the working tree, 

250 such as staging files, committing changes, and resetting the index. 

251 """ 

252 

253 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

254 """Initialize a WorkTree for the given repository. 

255 

256 Args: 

257 repo: The repository this working tree belongs to 

258 path: Path to the working tree directory 

259 """ 

260 self._repo = repo 

261 raw_path = os.fspath(path) 

262 if isinstance(raw_path, bytes): 

263 self.path: str = os.fsdecode(raw_path) 

264 else: 

265 self.path = raw_path 

266 self.path = os.path.abspath(self.path) 

267 

268 def stage( 

269 self, 

270 fs_paths: str 

271 | bytes 

272 | os.PathLike[str] 

273 | Iterable[str | bytes | os.PathLike[str]], 

274 ) -> None: 

275 """Stage a set of paths. 

276 

277 Args: 

278 fs_paths: List of paths, relative to the repository path 

279 """ 

280 root_path_bytes = os.fsencode(self.path) 

281 

282 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

283 fs_paths = [fs_paths] 

284 fs_paths = list(fs_paths) 

285 

286 from .index import ( 

287 _fs_to_tree_path, 

288 blob_from_path_and_stat, 

289 index_entry_from_directory, 

290 index_entry_from_stat, 

291 ) 

292 

293 index = self._repo.open_index() 

294 blob_normalizer = self._repo.get_blob_normalizer() 

295 for fs_path in fs_paths: 

296 if not isinstance(fs_path, bytes): 

297 fs_path = os.fsencode(fs_path) 

298 if os.path.isabs(fs_path): 

299 raise ValueError( 

300 f"path {fs_path!r} should be relative to " 

301 "repository root, not absolute" 

302 ) 

303 tree_path = _fs_to_tree_path(fs_path) 

304 full_path = os.path.join(root_path_bytes, fs_path) 

305 try: 

306 st = os.lstat(full_path) 

307 except (FileNotFoundError, NotADirectoryError): 

308 # File no longer exists 

309 try: 

310 del index[tree_path] 

311 except KeyError: 

312 pass # already removed 

313 else: 

314 if stat.S_ISDIR(st.st_mode): 

315 entry = index_entry_from_directory(st, full_path) 

316 if entry: 

317 index[tree_path] = entry 

318 else: 

319 try: 

320 del index[tree_path] 

321 except KeyError: 

322 pass 

323 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

324 try: 

325 del index[tree_path] 

326 except KeyError: 

327 pass 

328 else: 

329 blob = blob_from_path_and_stat(full_path, st) 

330 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

331 self._repo.object_store.add_object(blob) 

332 index[tree_path] = index_entry_from_stat(st, blob.id) 

333 index.write() 

334 

335 def unstage(self, fs_paths: Sequence[str]) -> None: 

336 """Unstage specific file in the index. 

337 

338 Args: 

339 fs_paths: a list of files to unstage, 

340 relative to the repository path. 

341 """ 

342 from .index import IndexEntry, _fs_to_tree_path 

343 

344 index = self._repo.open_index() 

345 try: 

346 commit = self._repo[b"HEAD"] 

347 except KeyError: 

348 # no head mean no commit in the repo 

349 for fs_path in fs_paths: 

350 tree_path = _fs_to_tree_path(fs_path) 

351 del index[tree_path] 

352 index.write() 

353 return 

354 else: 

355 assert isinstance(commit, Commit), "HEAD must be a commit" 

356 tree_id = commit.tree 

357 

358 for fs_path in fs_paths: 

359 tree_path = _fs_to_tree_path(fs_path) 

360 try: 

361 tree = self._repo.object_store[tree_id] 

362 assert isinstance(tree, Tree) 

363 tree_entry = tree.lookup_path( 

364 self._repo.object_store.__getitem__, tree_path 

365 ) 

366 except KeyError: 

367 # if tree_entry didn't exist, this file was being added, so 

368 # remove index entry 

369 try: 

370 del index[tree_path] 

371 continue 

372 except KeyError as exc: 

373 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

374 

375 st = None 

376 try: 

377 st = os.lstat(os.path.join(self.path, fs_path)) 

378 except FileNotFoundError: 

379 pass 

380 

381 blob_obj = self._repo[tree_entry[1]] 

382 assert isinstance(blob_obj, Blob) 

383 blob_size = len(blob_obj.data) 

384 

385 index_entry = IndexEntry( 

386 ctime=(commit.commit_time, 0), 

387 mtime=(commit.commit_time, 0), 

388 dev=st.st_dev if st else 0, 

389 ino=st.st_ino if st else 0, 

390 mode=tree_entry[0], 

391 uid=st.st_uid if st else 0, 

392 gid=st.st_gid if st else 0, 

393 size=blob_size, 

394 sha=tree_entry[1], 

395 flags=0, 

396 extended_flags=0, 

397 ) 

398 

399 index[tree_path] = index_entry 

400 index.write() 

401 

402 def commit( 

403 self, 

404 message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None, 

405 committer: bytes | None = None, 

406 author: bytes | None = None, 

407 commit_timestamp: float | None = None, 

408 commit_timezone: int | None = None, 

409 author_timestamp: float | None = None, 

410 author_timezone: int | None = None, 

411 tree: ObjectID | None = None, 

412 encoding: bytes | None = None, 

413 ref: Ref | None = b"HEAD", 

414 merge_heads: Sequence[ObjectID] | None = None, 

415 no_verify: bool = False, 

416 sign: bool | None = None, 

417 ) -> ObjectID: 

418 """Create a new commit. 

419 

420 If not specified, committer and author default to 

421 get_user_identity(..., 'COMMITTER') 

422 and get_user_identity(..., 'AUTHOR') respectively. 

423 

424 Args: 

425 message: Commit message (bytes or callable that takes (repo, commit) 

426 and returns bytes) 

427 committer: Committer fullname 

428 author: Author fullname 

429 commit_timestamp: Commit timestamp (defaults to now) 

430 commit_timezone: Commit timestamp timezone (defaults to GMT) 

431 author_timestamp: Author timestamp (defaults to commit 

432 timestamp) 

433 author_timezone: Author timestamp timezone 

434 (defaults to commit timestamp timezone) 

435 tree: SHA1 of the tree root to use (if not specified the 

436 current index will be committed). 

437 encoding: Encoding 

438 ref: Optional ref to commit to (defaults to current branch). 

439 If None, creates a dangling commit without updating any ref. 

440 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

441 no_verify: Skip pre-commit and commit-msg hooks 

442 sign: GPG Sign the commit (bool, defaults to False, 

443 pass True to use default GPG key, 

444 pass a str containing Key ID to use a specific GPG key) 

445 

446 Returns: 

447 New commit SHA1 

448 """ 

449 try: 

450 if not no_verify: 

451 self._repo.hooks["pre-commit"].execute() 

452 except HookError as exc: 

453 raise CommitError(exc) from exc 

454 except KeyError: # no hook defined, silent fallthrough 

455 pass 

456 

457 c = Commit() 

458 if tree is None: 

459 index = self._repo.open_index() 

460 c.tree = index.commit(self._repo.object_store) 

461 else: 

462 if len(tree) != 40: 

463 raise ValueError("tree must be a 40-byte hex sha string") 

464 c.tree = tree 

465 

466 config = self._repo.get_config_stack() 

467 if merge_heads is None: 

468 merge_heads = self._repo._read_heads("MERGE_HEAD") 

469 if committer is None: 

470 committer = get_user_identity(config, kind="COMMITTER") 

471 check_user_identity(committer) 

472 c.committer = committer 

473 if commit_timestamp is None: 

474 # FIXME: Support GIT_COMMITTER_DATE environment variable 

475 commit_timestamp = time.time() 

476 c.commit_time = int(commit_timestamp) 

477 if commit_timezone is None: 

478 # FIXME: Use current user timezone rather than UTC 

479 commit_timezone = 0 

480 c.commit_timezone = commit_timezone 

481 if author is None: 

482 author = get_user_identity(config, kind="AUTHOR") 

483 c.author = author 

484 check_user_identity(author) 

485 if author_timestamp is None: 

486 # FIXME: Support GIT_AUTHOR_DATE environment variable 

487 author_timestamp = commit_timestamp 

488 c.author_time = int(author_timestamp) 

489 if author_timezone is None: 

490 author_timezone = commit_timezone 

491 c.author_timezone = author_timezone 

492 if encoding is None: 

493 try: 

494 encoding = config.get(("i18n",), "commitEncoding") 

495 except KeyError: 

496 pass # No dice 

497 if encoding is not None: 

498 c.encoding = encoding 

499 # Store original message (might be callable) 

500 original_message = message 

501 message = None # Will be set later after parents are set 

502 

503 # Check if we should sign the commit 

504 if sign is None: 

505 # Check commit.gpgSign configuration when sign is not explicitly set 

506 try: 

507 should_sign = config.get_boolean( 

508 (b"commit",), b"gpgsign", default=False 

509 ) 

510 except KeyError: 

511 should_sign = False # Default to not signing if no config 

512 else: 

513 should_sign = sign 

514 

515 # Get the signing key from config if signing is enabled 

516 keyid = None 

517 if should_sign: 

518 try: 

519 keyid_bytes = config.get((b"user",), b"signingkey") 

520 keyid = keyid_bytes.decode() if keyid_bytes else None 

521 except KeyError: 

522 keyid = None 

523 

524 if ref is None: 

525 # Create a dangling commit 

526 c.parents = merge_heads 

527 else: 

528 try: 

529 old_head = self._repo.refs[ref] 

530 c.parents = [old_head, *merge_heads] 

531 except KeyError: 

532 c.parents = merge_heads 

533 

534 # Handle message after parents are set 

535 if callable(original_message): 

536 message = original_message(self._repo, c) 

537 if message is None: 

538 raise ValueError("Message callback returned None") 

539 else: 

540 message = original_message 

541 

542 if message is None: 

543 # FIXME: Try to read commit message from .git/MERGE_MSG 

544 raise ValueError("No commit message specified") 

545 

546 try: 

547 if no_verify: 

548 c.message = message 

549 else: 

550 c.message = self._repo.hooks["commit-msg"].execute(message) 

551 if c.message is None: 

552 c.message = message 

553 except HookError as exc: 

554 raise CommitError(exc) from exc 

555 except KeyError: # no hook defined, message not modified 

556 c.message = message 

557 

558 if ref is None: 

559 # Create a dangling commit 

560 if should_sign: 

561 c.sign(keyid) 

562 self._repo.object_store.add_object(c) 

563 else: 

564 try: 

565 old_head = self._repo.refs[ref] 

566 if should_sign: 

567 c.sign(keyid) 

568 self._repo.object_store.add_object(c) 

569 message_bytes = ( 

570 message.encode() if isinstance(message, str) else message 

571 ) 

572 ok = self._repo.refs.set_if_equals( 

573 ref, 

574 old_head, 

575 c.id, 

576 message=b"commit: " + message_bytes, 

577 committer=committer, 

578 timestamp=int(commit_timestamp) 

579 if commit_timestamp is not None 

580 else None, 

581 timezone=commit_timezone, 

582 ) 

583 except KeyError: 

584 c.parents = merge_heads 

585 if should_sign: 

586 c.sign(keyid) 

587 self._repo.object_store.add_object(c) 

588 message_bytes = ( 

589 message.encode() if isinstance(message, str) else message 

590 ) 

591 ok = self._repo.refs.add_if_new( 

592 ref, 

593 c.id, 

594 message=b"commit: " + message_bytes, 

595 committer=committer, 

596 timestamp=int(commit_timestamp) 

597 if commit_timestamp is not None 

598 else None, 

599 timezone=commit_timezone, 

600 ) 

601 if not ok: 

602 # Fail if the atomic compare-and-swap failed, leaving the 

603 # commit and all its objects as garbage. 

604 raise CommitError(f"{ref!r} changed during commit") 

605 

606 self._repo._del_named_file("MERGE_HEAD") 

607 

608 try: 

609 self._repo.hooks["post-commit"].execute() 

610 except HookError as e: # silent failure 

611 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

612 except KeyError: # no hook defined, silent fallthrough 

613 pass 

614 

615 # Trigger auto GC if needed 

616 from .gc import maybe_auto_gc 

617 

618 maybe_auto_gc(self._repo) 

619 

620 return c.id 

621 

622 def reset_index(self, tree: bytes | None = None) -> None: 

623 """Reset the index back to a specific tree. 

624 

625 Args: 

626 tree: Tree SHA to reset to, None for current HEAD tree. 

627 """ 

628 from .index import ( 

629 build_index_from_tree, 

630 symlink, 

631 validate_path_element_default, 

632 validate_path_element_hfs, 

633 validate_path_element_ntfs, 

634 ) 

635 

636 if tree is None: 

637 head = self._repo[b"HEAD"] 

638 if isinstance(head, Tag): 

639 _cls, obj = head.object 

640 head = self._repo.get_object(obj) 

641 from .objects import Commit 

642 

643 assert isinstance(head, Commit) 

644 tree = head.tree 

645 config = self._repo.get_config() 

646 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

647 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

648 validate_path_element = validate_path_element_ntfs 

649 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

650 validate_path_element = validate_path_element_hfs 

651 else: 

652 validate_path_element = validate_path_element_default 

653 if config.get_boolean(b"core", b"symlinks", True): 

654 symlink_fn = symlink 

655 else: 

656 

657 def symlink_fn( # type: ignore[misc,unused-ignore] 

658 src: Union[str, bytes], 

659 dst: Union[str, bytes], 

660 target_is_directory: bool = False, 

661 *, 

662 dir_fd: int | None = None, 

663 ) -> None: 

664 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f: 

665 f.write(src) 

666 

667 blob_normalizer = self._repo.get_blob_normalizer() 

668 return build_index_from_tree( 

669 self.path, 

670 self._repo.index_path(), 

671 self._repo.object_store, 

672 tree, 

673 honor_filemode=honor_filemode, 

674 validate_path_element=validate_path_element, 

675 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore] 

676 blob_normalizer=blob_normalizer, 

677 ) 

678 

679 def _sparse_checkout_file_path(self) -> str: 

680 """Return the path of the sparse-checkout file in this repo's control dir.""" 

681 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

682 

683 def configure_for_cone_mode(self) -> None: 

684 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

685 config = self._repo.get_config() 

686 config.set((b"core",), b"sparseCheckout", b"true") 

687 config.set((b"core",), b"sparseCheckoutCone", b"true") 

688 config.write_to_path() 

689 

690 def infer_cone_mode(self) -> bool: 

691 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

692 config = self._repo.get_config() 

693 try: 

694 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

695 return sc_cone == b"true" 

696 except KeyError: 

697 # If core.sparseCheckoutCone is not set, default to False 

698 return False 

699 

700 def get_sparse_checkout_patterns(self) -> list[str]: 

701 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

702 

703 Returns: 

704 A list of patterns. Returns an empty list if the file is missing. 

705 """ 

706 path = self._sparse_checkout_file_path() 

707 try: 

708 with open(path, encoding="utf-8") as f: 

709 return [line.strip() for line in f if line.strip()] 

710 except FileNotFoundError: 

711 return [] 

712 

713 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

714 """Write the given sparse-checkout patterns into info/sparse-checkout. 

715 

716 Creates the info/ directory if it does not exist. 

717 

718 Args: 

719 patterns: A list of gitignore-style patterns to store. 

720 """ 

721 info_dir = os.path.join(self._repo.controldir(), "info") 

722 os.makedirs(info_dir, exist_ok=True) 

723 

724 path = self._sparse_checkout_file_path() 

725 with open(path, "w", encoding="utf-8") as f: 

726 for pat in patterns: 

727 f.write(pat + "\n") 

728 

729 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

730 """Write the given cone-mode directory patterns into info/sparse-checkout. 

731 

732 For each directory to include, add an inclusion line that "undoes" the prior 

733 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

734 Never add the same line twice. 

735 """ 

736 patterns = ["/*", "!/*/"] 

737 if dirs: 

738 for d in dirs: 

739 d = d.strip("/") 

740 line = f"/{d}/" 

741 if d and line not in patterns: 

742 patterns.append(line) 

743 self.set_sparse_checkout_patterns(patterns) 

744 

745 

746def read_worktree_lock_reason(worktree_path: str) -> str | None: 

747 """Read the lock reason for a worktree. 

748 

749 Args: 

750 worktree_path: Path to the worktree's administrative directory 

751 

752 Returns: 

753 The lock reason if the worktree is locked, None otherwise 

754 """ 

755 locked_path = os.path.join(worktree_path, "locked") 

756 if not os.path.exists(locked_path): 

757 return None 

758 

759 try: 

760 with open(locked_path) as f: 

761 return f.read().strip() 

762 except (FileNotFoundError, PermissionError): 

763 return None 

764 

765 

766def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

767 """List all worktrees for the given repository. 

768 

769 Args: 

770 repo: The repository to list worktrees for 

771 

772 Returns: 

773 A list of WorkTreeInfo objects 

774 """ 

775 worktrees = [] 

776 

777 # Add main worktree 

778 main_wt_info = WorkTreeInfo( 

779 path=repo.path, 

780 head=repo.head(), 

781 bare=repo.bare, 

782 detached=False, 

783 locked=False, 

784 prunable=False, 

785 ) 

786 

787 # Get branch info for main worktree 

788 try: 

789 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

790 head_contents = f.read().strip() 

791 if head_contents.startswith(SYMREF): 

792 ref_name = head_contents[len(SYMREF) :].strip() 

793 main_wt_info.branch = ref_name 

794 else: 

795 main_wt_info.detached = True 

796 main_wt_info.branch = None 

797 except (FileNotFoundError, PermissionError): 

798 main_wt_info.branch = None 

799 main_wt_info.detached = True 

800 

801 worktrees.append(main_wt_info) 

802 

803 # List additional worktrees 

804 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

805 if os.path.isdir(worktrees_dir): 

806 for entry in os.listdir(worktrees_dir): 

807 worktree_path = os.path.join(worktrees_dir, entry) 

808 if not os.path.isdir(worktree_path): 

809 continue 

810 

811 wt_info = WorkTreeInfo( 

812 path="", # Will be set below 

813 bare=False, 

814 detached=False, 

815 locked=False, 

816 prunable=False, 

817 ) 

818 

819 # Read gitdir to get actual worktree path 

820 gitdir_path = os.path.join(worktree_path, GITDIR) 

821 try: 

822 with open(gitdir_path, "rb") as f: 

823 gitdir_contents = f.read().strip() 

824 # Convert relative path to absolute if needed 

825 wt_path = os.fsdecode(gitdir_contents) 

826 if not os.path.isabs(wt_path): 

827 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

828 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

829 except (FileNotFoundError, PermissionError): 

830 # Worktree directory is missing, skip it 

831 # TODO: Consider adding these as prunable worktrees with a placeholder path 

832 continue 

833 

834 # Check if worktree path exists 

835 if wt_info.path and not os.path.exists(wt_info.path): 

836 wt_info.prunable = True 

837 

838 # Read HEAD 

839 head_path = os.path.join(worktree_path, "HEAD") 

840 try: 

841 with open(head_path, "rb") as f: 

842 head_contents = f.read().strip() 

843 if head_contents.startswith(SYMREF): 

844 ref_name = head_contents[len(SYMREF) :].strip() 

845 wt_info.branch = ref_name 

846 # Resolve ref to get commit sha 

847 try: 

848 wt_info.head = repo.refs[ref_name] 

849 except KeyError: 

850 wt_info.head = None 

851 else: 

852 wt_info.detached = True 

853 wt_info.branch = None 

854 wt_info.head = head_contents 

855 except (FileNotFoundError, PermissionError): 

856 wt_info.head = None 

857 wt_info.branch = None 

858 

859 # Check if locked 

860 lock_reason = read_worktree_lock_reason(worktree_path) 

861 if lock_reason is not None: 

862 wt_info.locked = True 

863 wt_info.lock_reason = lock_reason 

864 

865 worktrees.append(wt_info) 

866 

867 return worktrees 

868 

869 

870def add_worktree( 

871 repo: Repo, 

872 path: str | bytes | os.PathLike[str], 

873 branch: str | bytes | None = None, 

874 commit: ObjectID | None = None, 

875 force: bool = False, 

876 detach: bool = False, 

877 exist_ok: bool = False, 

878) -> Repo: 

879 """Add a new worktree to the repository. 

880 

881 Args: 

882 repo: The main repository 

883 path: Path where the new worktree should be created 

884 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

885 commit: Specific commit to checkout (results in detached HEAD) 

886 force: Force creation even if branch is already checked out elsewhere 

887 detach: Detach HEAD in the new worktree 

888 exist_ok: If True, do not raise an error if the directory already exists 

889 

890 Returns: 

891 The newly created worktree repository 

892 

893 Raises: 

894 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

895 """ 

896 from .repo import Repo as RepoClass 

897 

898 path = os.fspath(path) 

899 if isinstance(path, bytes): 

900 path = os.fsdecode(path) 

901 

902 # Check if path already exists 

903 if os.path.exists(path) and not exist_ok: 

904 raise ValueError(f"Path already exists: {path}") 

905 

906 # Normalize branch name 

907 if branch is not None: 

908 if isinstance(branch, str): 

909 branch = branch.encode() 

910 if not branch.startswith(b"refs/heads/"): 

911 branch = b"refs/heads/" + branch 

912 

913 # Check if branch is already checked out in another worktree 

914 if branch and not force: 

915 for wt in list_worktrees(repo): 

916 if wt.branch == branch: 

917 raise ValueError( 

918 f"Branch {branch.decode()} is already checked out at {wt.path}" 

919 ) 

920 

921 # Determine what to checkout 

922 if commit is not None: 

923 checkout_ref = commit 

924 detach = True 

925 elif branch is not None: 

926 # Check if branch exists 

927 try: 

928 checkout_ref = repo.refs[branch] 

929 except KeyError: 

930 if commit is None: 

931 # Create new branch from HEAD 

932 checkout_ref = repo.head() 

933 repo.refs[branch] = checkout_ref 

934 else: 

935 # Create new branch from specified commit 

936 checkout_ref = commit 

937 repo.refs[branch] = checkout_ref 

938 else: 

939 # Default to current HEAD 

940 checkout_ref = repo.head() 

941 detach = True 

942 

943 # Create the worktree directory 

944 os.makedirs(path, exist_ok=exist_ok) 

945 

946 # Initialize the worktree 

947 identifier = os.path.basename(path) 

948 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier) 

949 

950 # Set HEAD appropriately 

951 if detach: 

952 # Detached HEAD - write SHA directly to HEAD 

953 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

954 f.write(checkout_ref + b"\n") 

955 else: 

956 # Point to branch 

957 assert branch is not None # Should be guaranteed by logic above 

958 wt_repo.refs.set_symbolic_ref(b"HEAD", branch) 

959 

960 # Reset index to match HEAD 

961 wt_repo.get_worktree().reset_index() 

962 

963 return wt_repo 

964 

965 

966def remove_worktree( 

967 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False 

968) -> None: 

969 """Remove a worktree. 

970 

971 Args: 

972 repo: The main repository 

973 path: Path to the worktree to remove 

974 force: Force removal even if there are local changes 

975 

976 Raises: 

977 ValueError: If the worktree doesn't exist, has local changes, or is locked 

978 """ 

979 path = os.fspath(path) 

980 if isinstance(path, bytes): 

981 path = os.fsdecode(path) 

982 

983 # Don't allow removing the main worktree 

984 if os.path.abspath(path) == os.path.abspath(repo.path): 

985 raise ValueError("Cannot remove the main working tree") 

986 

987 # Find the worktree 

988 worktree_found = False 

989 worktree_id = None 

990 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

991 

992 if os.path.isdir(worktrees_dir): 

993 for entry in os.listdir(worktrees_dir): 

994 worktree_path = os.path.join(worktrees_dir, entry) 

995 gitdir_path = os.path.join(worktree_path, GITDIR) 

996 

997 try: 

998 with open(gitdir_path, "rb") as f: 

999 gitdir_contents = f.read().strip() 

1000 wt_path = os.fsdecode(gitdir_contents) 

1001 if not os.path.isabs(wt_path): 

1002 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1003 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1004 

1005 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1006 worktree_found = True 

1007 worktree_id = entry 

1008 break 

1009 except (FileNotFoundError, PermissionError): 

1010 continue 

1011 

1012 if not worktree_found: 

1013 raise ValueError(f"Worktree not found: {path}") 

1014 

1015 assert worktree_id is not None # Should be set if worktree_found is True 

1016 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

1017 

1018 # Check if locked 

1019 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

1020 if not force: 

1021 raise ValueError(f"Worktree is locked: {path}") 

1022 

1023 # Check for local changes if not forcing 

1024 if not force and os.path.exists(path): 

1025 # TODO: Check for uncommitted changes in the worktree 

1026 pass 

1027 

1028 # Remove the working directory 

1029 if os.path.exists(path): 

1030 shutil.rmtree(path) 

1031 

1032 # Remove the administrative files 

1033 shutil.rmtree(worktree_control_dir) 

1034 

1035 

1036def prune_worktrees( 

1037 repo: Repo, expire: int | None = None, dry_run: bool = False 

1038) -> list[str]: 

1039 """Prune worktree administrative files for missing worktrees. 

1040 

1041 Args: 

1042 repo: The main repository 

1043 expire: Only prune worktrees older than this many seconds 

1044 dry_run: Don't actually remove anything, just report what would be removed 

1045 

1046 Returns: 

1047 List of pruned worktree identifiers 

1048 """ 

1049 pruned: list[str] = [] 

1050 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1051 

1052 if not os.path.isdir(worktrees_dir): 

1053 return pruned 

1054 

1055 current_time = time.time() 

1056 

1057 for entry in os.listdir(worktrees_dir): 

1058 worktree_path = os.path.join(worktrees_dir, entry) 

1059 if not os.path.isdir(worktree_path): 

1060 continue 

1061 

1062 # Skip locked worktrees 

1063 if os.path.exists(os.path.join(worktree_path, "locked")): 

1064 continue 

1065 

1066 should_prune = False 

1067 

1068 # Check if gitdir exists and points to valid location 

1069 gitdir_path = os.path.join(worktree_path, GITDIR) 

1070 try: 

1071 with open(gitdir_path, "rb") as f: 

1072 gitdir_contents = f.read().strip() 

1073 wt_path = os.fsdecode(gitdir_contents) 

1074 if not os.path.isabs(wt_path): 

1075 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1076 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1077 

1078 if not os.path.exists(wt_dir): 

1079 should_prune = True 

1080 except (FileNotFoundError, PermissionError): 

1081 should_prune = True 

1082 

1083 # Check expiry time if specified 

1084 if should_prune and expire is not None: 

1085 stat_info = os.stat(worktree_path) 

1086 age = current_time - stat_info.st_mtime 

1087 if age < expire: 

1088 should_prune = False 

1089 

1090 if should_prune: 

1091 pruned.append(entry) 

1092 if not dry_run: 

1093 shutil.rmtree(worktree_path) 

1094 

1095 return pruned 

1096 

1097 

1098def lock_worktree( 

1099 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None 

1100) -> None: 

1101 """Lock a worktree to prevent it from being pruned. 

1102 

1103 Args: 

1104 repo: The main repository 

1105 path: Path to the worktree to lock 

1106 reason: Optional reason for locking 

1107 """ 

1108 worktree_id = _find_worktree_id(repo, path) 

1109 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1110 

1111 lock_path = os.path.join(worktree_control_dir, "locked") 

1112 with open(lock_path, "w") as f: 

1113 if reason: 

1114 f.write(reason) 

1115 

1116 

1117def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

1118 """Unlock a worktree. 

1119 

1120 Args: 

1121 repo: The main repository 

1122 path: Path to the worktree to unlock 

1123 """ 

1124 worktree_id = _find_worktree_id(repo, path) 

1125 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1126 

1127 lock_path = os.path.join(worktree_control_dir, "locked") 

1128 if os.path.exists(lock_path): 

1129 os.remove(lock_path) 

1130 

1131 

1132def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str: 

1133 """Find the worktree identifier for the given path. 

1134 

1135 Args: 

1136 repo: The main repository 

1137 path: Path to the worktree 

1138 

1139 Returns: 

1140 The worktree identifier 

1141 

1142 Raises: 

1143 ValueError: If the worktree is not found 

1144 """ 

1145 path = os.fspath(path) 

1146 if isinstance(path, bytes): 

1147 path = os.fsdecode(path) 

1148 

1149 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1150 

1151 if os.path.isdir(worktrees_dir): 

1152 for entry in os.listdir(worktrees_dir): 

1153 worktree_path = os.path.join(worktrees_dir, entry) 

1154 gitdir_path = os.path.join(worktree_path, GITDIR) 

1155 

1156 try: 

1157 with open(gitdir_path, "rb") as f: 

1158 gitdir_contents = f.read().strip() 

1159 wt_path = os.fsdecode(gitdir_contents) 

1160 if not os.path.isabs(wt_path): 

1161 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1162 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1163 

1164 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1165 return entry 

1166 except (FileNotFoundError, PermissionError): 

1167 continue 

1168 

1169 raise ValueError(f"Worktree not found: {path}") 

1170 

1171 

1172def move_worktree( 

1173 repo: Repo, 

1174 old_path: str | bytes | os.PathLike[str], 

1175 new_path: str | bytes | os.PathLike[str], 

1176) -> None: 

1177 """Move a worktree to a new location. 

1178 

1179 Args: 

1180 repo: The main repository 

1181 old_path: Current path of the worktree 

1182 new_path: New path for the worktree 

1183 

1184 Raises: 

1185 ValueError: If the worktree doesn't exist or new path already exists 

1186 """ 

1187 old_path = os.fspath(old_path) 

1188 new_path = os.fspath(new_path) 

1189 if isinstance(old_path, bytes): 

1190 old_path = os.fsdecode(old_path) 

1191 if isinstance(new_path, bytes): 

1192 new_path = os.fsdecode(new_path) 

1193 

1194 # Don't allow moving the main worktree 

1195 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1196 raise ValueError("Cannot move the main working tree") 

1197 

1198 # Check if new path already exists 

1199 if os.path.exists(new_path): 

1200 raise ValueError(f"Path already exists: {new_path}") 

1201 

1202 # Find the worktree 

1203 worktree_id = _find_worktree_id(repo, old_path) 

1204 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1205 

1206 # Move the actual worktree directory 

1207 shutil.move(old_path, new_path) 

1208 

1209 # Update the gitdir file in the worktree 

1210 gitdir_file = os.path.join(new_path, ".git") 

1211 

1212 # Update the gitdir pointer in the control directory 

1213 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1214 f.write(os.fsencode(gitdir_file) + b"\n") 

1215 

1216 

1217@contextmanager 

1218def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1219 """Create a temporary worktree that is automatically cleaned up. 

1220 

1221 Args: 

1222 repo: Dulwich repository object 

1223 prefix: Prefix for the temporary directory name 

1224 

1225 Yields: 

1226 Worktree object 

1227 """ 

1228 temp_dir = None 

1229 worktree = None 

1230 

1231 try: 

1232 # Create temporary directory 

1233 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1234 

1235 # Add worktree 

1236 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1237 

1238 yield worktree 

1239 

1240 finally: 

1241 # Clean up worktree registration 

1242 if worktree: 

1243 repo.worktrees.remove(worktree.path) 

1244 

1245 # Clean up temporary directory 

1246 if temp_dir and Path(temp_dir).exists(): 

1247 shutil.rmtree(temp_dir)