Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

553 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26import builtins 

27import os 

28import shutil 

29import stat 

30import sys 

31import tempfile 

32import time 

33import warnings 

34from collections.abc import Iterable, Iterator 

35from contextlib import contextmanager 

36from pathlib import Path 

37from typing import Any, Callable, Union 

38 

39from .errors import CommitError, HookError 

40from .objects import Blob, Commit, ObjectID, Tag, Tree 

41from .refs import SYMREF, Ref 

42from .repo import ( 

43 GITDIR, 

44 WORKTREES, 

45 Repo, 

46 check_user_identity, 

47 get_user_identity, 

48) 

49 

50 

51class WorkTreeInfo: 

52 """Information about a single worktree. 

53 

54 Attributes: 

55 path: Path to the worktree 

56 head: Current HEAD commit SHA 

57 branch: Current branch (if not detached) 

58 bare: Whether this is a bare repository 

59 detached: Whether HEAD is detached 

60 locked: Whether the worktree is locked 

61 prunable: Whether the worktree can be pruned 

62 lock_reason: Reason for locking (if locked) 

63 """ 

64 

65 def __init__( 

66 self, 

67 path: str, 

68 head: bytes | None = None, 

69 branch: bytes | None = None, 

70 bare: bool = False, 

71 detached: bool = False, 

72 locked: bool = False, 

73 prunable: bool = False, 

74 lock_reason: str | None = None, 

75 ): 

76 """Initialize WorkTreeInfo. 

77 

78 Args: 

79 path: Path to the worktree 

80 head: Current HEAD commit SHA 

81 branch: Current branch (if not detached) 

82 bare: Whether this is a bare repository 

83 detached: Whether HEAD is detached 

84 locked: Whether the worktree is locked 

85 prunable: Whether the worktree can be pruned 

86 lock_reason: Reason for locking (if locked) 

87 """ 

88 self.path = path 

89 self.head = head 

90 self.branch = branch 

91 self.bare = bare 

92 self.detached = detached 

93 self.locked = locked 

94 self.prunable = prunable 

95 self.lock_reason = lock_reason 

96 

97 def __repr__(self) -> str: 

98 """Return string representation of WorkTreeInfo.""" 

99 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

100 

101 def __eq__(self, other: object) -> bool: 

102 """Check equality with another WorkTreeInfo.""" 

103 if not isinstance(other, WorkTreeInfo): 

104 return NotImplemented 

105 return ( 

106 self.path == other.path 

107 and self.head == other.head 

108 and self.branch == other.branch 

109 and self.bare == other.bare 

110 and self.detached == other.detached 

111 and self.locked == other.locked 

112 and self.prunable == other.prunable 

113 and self.lock_reason == other.lock_reason 

114 ) 

115 

116 def open(self) -> WorkTree: 

117 """Open this worktree as a WorkTree. 

118 

119 Returns: 

120 WorkTree object for this worktree 

121 

122 Raises: 

123 NotGitRepository: If the worktree path is invalid 

124 """ 

125 from .repo import Repo 

126 

127 repo = Repo(self.path) 

128 return WorkTree(repo, self.path) 

129 

130 

131class WorkTreeContainer: 

132 """Container for managing multiple working trees. 

133 

134 This class manages worktrees for a repository, similar to how 

135 RefsContainer manages references. 

136 """ 

137 

138 def __init__(self, repo: Repo) -> None: 

139 """Initialize a WorkTreeContainer for the given repository. 

140 

141 Args: 

142 repo: The repository this container belongs to 

143 """ 

144 self._repo = repo 

145 

146 def list(self) -> list[WorkTreeInfo]: 

147 """List all worktrees for this repository. 

148 

149 Returns: 

150 A list of WorkTreeInfo objects 

151 """ 

152 return list_worktrees(self._repo) 

153 

154 def add( 

155 self, 

156 path: str | bytes | os.PathLike, 

157 branch: str | bytes | None = None, 

158 commit: ObjectID | None = None, 

159 force: bool = False, 

160 detach: bool = False, 

161 exist_ok: bool = False, 

162 ) -> Repo: 

163 """Add a new worktree. 

164 

165 Args: 

166 path: Path where the new worktree should be created 

167 branch: Branch to checkout in the new worktree 

168 commit: Specific commit to checkout (results in detached HEAD) 

169 force: Force creation even if branch is already checked out elsewhere 

170 detach: Detach HEAD in the new worktree 

171 exist_ok: If True, do not raise an error if the directory already exists 

172 

173 Returns: 

174 The newly created worktree repository 

175 """ 

176 return add_worktree( 

177 self._repo, 

178 path, 

179 branch=branch, 

180 commit=commit, 

181 force=force, 

182 detach=detach, 

183 exist_ok=exist_ok, 

184 ) 

185 

186 def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None: 

187 """Remove a worktree. 

188 

189 Args: 

190 path: Path to the worktree to remove 

191 force: Force removal even if there are local changes 

192 """ 

193 remove_worktree(self._repo, path, force=force) 

194 

195 def prune( 

196 self, expire: int | None = None, dry_run: bool = False 

197 ) -> builtins.list[str]: 

198 """Prune worktree administrative files for missing worktrees. 

199 

200 Args: 

201 expire: Only prune worktrees older than this many seconds 

202 dry_run: Don't actually remove anything, just report what would be removed 

203 

204 Returns: 

205 List of pruned worktree identifiers 

206 """ 

207 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

208 

209 def move( 

210 self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike 

211 ) -> None: 

212 """Move a worktree to a new location. 

213 

214 Args: 

215 old_path: Current path of the worktree 

216 new_path: New path for the worktree 

217 """ 

218 move_worktree(self._repo, old_path, new_path) 

219 

220 def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None: 

221 """Lock a worktree to prevent it from being pruned. 

222 

223 Args: 

224 path: Path to the worktree to lock 

225 reason: Optional reason for locking 

226 """ 

227 lock_worktree(self._repo, path, reason=reason) 

228 

229 def unlock(self, path: str | bytes | os.PathLike) -> None: 

230 """Unlock a worktree. 

231 

232 Args: 

233 path: Path to the worktree to unlock 

234 """ 

235 unlock_worktree(self._repo, path) 

236 

237 def __iter__(self) -> Iterator[WorkTreeInfo]: 

238 """Iterate over all worktrees.""" 

239 yield from self.list() 

240 

241 

242class WorkTree: 

243 """Working tree operations for a Git repository. 

244 

245 This class provides methods for working with the working tree, 

246 such as staging files, committing changes, and resetting the index. 

247 """ 

248 

249 def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None: 

250 """Initialize a WorkTree for the given repository. 

251 

252 Args: 

253 repo: The repository this working tree belongs to 

254 path: Path to the working tree directory 

255 """ 

256 self._repo = repo 

257 raw_path = os.fspath(path) 

258 if isinstance(raw_path, bytes): 

259 self.path: str = os.fsdecode(raw_path) 

260 else: 

261 self.path = raw_path 

262 self.path = os.path.abspath(self.path) 

263 

264 def stage( 

265 self, 

266 fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

267 ) -> None: 

268 """Stage a set of paths. 

269 

270 Args: 

271 fs_paths: List of paths, relative to the repository path 

272 """ 

273 root_path_bytes = os.fsencode(self.path) 

274 

275 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

276 fs_paths = [fs_paths] 

277 fs_paths = list(fs_paths) 

278 

279 from .index import ( 

280 _fs_to_tree_path, 

281 blob_from_path_and_stat, 

282 index_entry_from_directory, 

283 index_entry_from_stat, 

284 ) 

285 

286 index = self._repo.open_index() 

287 blob_normalizer = self._repo.get_blob_normalizer() 

288 for fs_path in fs_paths: 

289 if not isinstance(fs_path, bytes): 

290 fs_path = os.fsencode(fs_path) 

291 if os.path.isabs(fs_path): 

292 raise ValueError( 

293 f"path {fs_path!r} should be relative to " 

294 "repository root, not absolute" 

295 ) 

296 tree_path = _fs_to_tree_path(fs_path) 

297 full_path = os.path.join(root_path_bytes, fs_path) 

298 try: 

299 st = os.lstat(full_path) 

300 except (FileNotFoundError, NotADirectoryError): 

301 # File no longer exists 

302 try: 

303 del index[tree_path] 

304 except KeyError: 

305 pass # already removed 

306 else: 

307 if stat.S_ISDIR(st.st_mode): 

308 entry = index_entry_from_directory(st, full_path) 

309 if entry: 

310 index[tree_path] = entry 

311 else: 

312 try: 

313 del index[tree_path] 

314 except KeyError: 

315 pass 

316 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

317 try: 

318 del index[tree_path] 

319 except KeyError: 

320 pass 

321 else: 

322 blob = blob_from_path_and_stat(full_path, st) 

323 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

324 self._repo.object_store.add_object(blob) 

325 index[tree_path] = index_entry_from_stat(st, blob.id) 

326 index.write() 

327 

328 def unstage(self, fs_paths: list[str]) -> None: 

329 """Unstage specific file in the index. 

330 

331 Args: 

332 fs_paths: a list of files to unstage, 

333 relative to the repository path. 

334 """ 

335 from .index import IndexEntry, _fs_to_tree_path 

336 

337 index = self._repo.open_index() 

338 try: 

339 commit = self._repo[b"HEAD"] 

340 except KeyError: 

341 # no head mean no commit in the repo 

342 for fs_path in fs_paths: 

343 tree_path = _fs_to_tree_path(fs_path) 

344 del index[tree_path] 

345 index.write() 

346 return 

347 else: 

348 assert isinstance(commit, Commit), "HEAD must be a commit" 

349 tree_id = commit.tree 

350 

351 for fs_path in fs_paths: 

352 tree_path = _fs_to_tree_path(fs_path) 

353 try: 

354 tree = self._repo.object_store[tree_id] 

355 assert isinstance(tree, Tree) 

356 tree_entry = tree.lookup_path( 

357 self._repo.object_store.__getitem__, tree_path 

358 ) 

359 except KeyError: 

360 # if tree_entry didn't exist, this file was being added, so 

361 # remove index entry 

362 try: 

363 del index[tree_path] 

364 continue 

365 except KeyError as exc: 

366 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

367 

368 st = None 

369 try: 

370 st = os.lstat(os.path.join(self.path, fs_path)) 

371 except FileNotFoundError: 

372 pass 

373 

374 blob_obj = self._repo[tree_entry[1]] 

375 assert isinstance(blob_obj, Blob) 

376 blob_size = len(blob_obj.data) 

377 

378 index_entry = IndexEntry( 

379 ctime=(commit.commit_time, 0), 

380 mtime=(commit.commit_time, 0), 

381 dev=st.st_dev if st else 0, 

382 ino=st.st_ino if st else 0, 

383 mode=tree_entry[0], 

384 uid=st.st_uid if st else 0, 

385 gid=st.st_gid if st else 0, 

386 size=blob_size, 

387 sha=tree_entry[1], 

388 flags=0, 

389 extended_flags=0, 

390 ) 

391 

392 index[tree_path] = index_entry 

393 index.write() 

394 

395 def commit( 

396 self, 

397 message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None, 

398 committer: bytes | None = None, 

399 author: bytes | None = None, 

400 commit_timestamp: float | None = None, 

401 commit_timezone: int | None = None, 

402 author_timestamp: float | None = None, 

403 author_timezone: int | None = None, 

404 tree: ObjectID | None = None, 

405 encoding: bytes | None = None, 

406 ref: Ref | None = b"HEAD", 

407 merge_heads: list[ObjectID] | None = None, 

408 no_verify: bool = False, 

409 sign: bool = False, 

410 ) -> ObjectID: 

411 """Create a new commit. 

412 

413 If not specified, committer and author default to 

414 get_user_identity(..., 'COMMITTER') 

415 and get_user_identity(..., 'AUTHOR') respectively. 

416 

417 Args: 

418 message: Commit message (bytes or callable that takes (repo, commit) 

419 and returns bytes) 

420 committer: Committer fullname 

421 author: Author fullname 

422 commit_timestamp: Commit timestamp (defaults to now) 

423 commit_timezone: Commit timestamp timezone (defaults to GMT) 

424 author_timestamp: Author timestamp (defaults to commit 

425 timestamp) 

426 author_timezone: Author timestamp timezone 

427 (defaults to commit timestamp timezone) 

428 tree: SHA1 of the tree root to use (if not specified the 

429 current index will be committed). 

430 encoding: Encoding 

431 ref: Optional ref to commit to (defaults to current branch). 

432 If None, creates a dangling commit without updating any ref. 

433 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

434 no_verify: Skip pre-commit and commit-msg hooks 

435 sign: GPG Sign the commit (bool, defaults to False, 

436 pass True to use default GPG key, 

437 pass a str containing Key ID to use a specific GPG key) 

438 

439 Returns: 

440 New commit SHA1 

441 """ 

442 try: 

443 if not no_verify: 

444 self._repo.hooks["pre-commit"].execute() 

445 except HookError as exc: 

446 raise CommitError(exc) from exc 

447 except KeyError: # no hook defined, silent fallthrough 

448 pass 

449 

450 c = Commit() 

451 if tree is None: 

452 index = self._repo.open_index() 

453 c.tree = index.commit(self._repo.object_store) 

454 else: 

455 if len(tree) != 40: 

456 raise ValueError("tree must be a 40-byte hex sha string") 

457 c.tree = tree 

458 

459 config = self._repo.get_config_stack() 

460 if merge_heads is None: 

461 merge_heads = self._repo._read_heads("MERGE_HEAD") 

462 if committer is None: 

463 committer = get_user_identity(config, kind="COMMITTER") 

464 check_user_identity(committer) 

465 c.committer = committer 

466 if commit_timestamp is None: 

467 # FIXME: Support GIT_COMMITTER_DATE environment variable 

468 commit_timestamp = time.time() 

469 c.commit_time = int(commit_timestamp) 

470 if commit_timezone is None: 

471 # FIXME: Use current user timezone rather than UTC 

472 commit_timezone = 0 

473 c.commit_timezone = commit_timezone 

474 if author is None: 

475 author = get_user_identity(config, kind="AUTHOR") 

476 c.author = author 

477 check_user_identity(author) 

478 if author_timestamp is None: 

479 # FIXME: Support GIT_AUTHOR_DATE environment variable 

480 author_timestamp = commit_timestamp 

481 c.author_time = int(author_timestamp) 

482 if author_timezone is None: 

483 author_timezone = commit_timezone 

484 c.author_timezone = author_timezone 

485 if encoding is None: 

486 try: 

487 encoding = config.get(("i18n",), "commitEncoding") 

488 except KeyError: 

489 pass # No dice 

490 if encoding is not None: 

491 c.encoding = encoding 

492 # Store original message (might be callable) 

493 original_message = message 

494 message = None # Will be set later after parents are set 

495 

496 # Check if we should sign the commit 

497 should_sign = sign 

498 if sign is None: 

499 # Check commit.gpgSign configuration when sign is not explicitly set 

500 config = self._repo.get_config_stack() 

501 try: 

502 should_sign = config.get_boolean((b"commit",), b"gpgSign") 

503 except KeyError: 

504 should_sign = False # Default to not signing if no config 

505 keyid = sign if isinstance(sign, str) else None 

506 

507 if ref is None: 

508 # Create a dangling commit 

509 c.parents = merge_heads 

510 else: 

511 try: 

512 old_head = self._repo.refs[ref] 

513 c.parents = [old_head, *merge_heads] 

514 except KeyError: 

515 c.parents = merge_heads 

516 

517 # Handle message after parents are set 

518 if callable(original_message): 

519 message = original_message(self._repo, c) 

520 if message is None: 

521 raise ValueError("Message callback returned None") 

522 else: 

523 message = original_message 

524 

525 if message is None: 

526 # FIXME: Try to read commit message from .git/MERGE_MSG 

527 raise ValueError("No commit message specified") 

528 

529 try: 

530 if no_verify: 

531 c.message = message 

532 else: 

533 c.message = self._repo.hooks["commit-msg"].execute(message) 

534 if c.message is None: 

535 c.message = message 

536 except HookError as exc: 

537 raise CommitError(exc) from exc 

538 except KeyError: # no hook defined, message not modified 

539 c.message = message 

540 

541 if ref is None: 

542 # Create a dangling commit 

543 if should_sign: 

544 c.sign(keyid) 

545 self._repo.object_store.add_object(c) 

546 else: 

547 try: 

548 old_head = self._repo.refs[ref] 

549 if should_sign: 

550 c.sign(keyid) 

551 self._repo.object_store.add_object(c) 

552 message_bytes = ( 

553 message.encode() if isinstance(message, str) else message 

554 ) 

555 ok = self._repo.refs.set_if_equals( 

556 ref, 

557 old_head, 

558 c.id, 

559 message=b"commit: " + message_bytes, 

560 committer=committer, 

561 timestamp=int(commit_timestamp) 

562 if commit_timestamp is not None 

563 else None, 

564 timezone=commit_timezone, 

565 ) 

566 except KeyError: 

567 c.parents = merge_heads 

568 if should_sign: 

569 c.sign(keyid) 

570 self._repo.object_store.add_object(c) 

571 message_bytes = ( 

572 message.encode() if isinstance(message, str) else message 

573 ) 

574 ok = self._repo.refs.add_if_new( 

575 ref, 

576 c.id, 

577 message=b"commit: " + message_bytes, 

578 committer=committer, 

579 timestamp=int(commit_timestamp) 

580 if commit_timestamp is not None 

581 else None, 

582 timezone=commit_timezone, 

583 ) 

584 if not ok: 

585 # Fail if the atomic compare-and-swap failed, leaving the 

586 # commit and all its objects as garbage. 

587 raise CommitError(f"{ref!r} changed during commit") 

588 

589 self._repo._del_named_file("MERGE_HEAD") 

590 

591 try: 

592 self._repo.hooks["post-commit"].execute() 

593 except HookError as e: # silent failure 

594 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

595 except KeyError: # no hook defined, silent fallthrough 

596 pass 

597 

598 # Trigger auto GC if needed 

599 from .gc import maybe_auto_gc 

600 

601 maybe_auto_gc(self._repo) 

602 

603 return c.id 

604 

605 def reset_index(self, tree: bytes | None = None) -> None: 

606 """Reset the index back to a specific tree. 

607 

608 Args: 

609 tree: Tree SHA to reset to, None for current HEAD tree. 

610 """ 

611 from .index import ( 

612 build_index_from_tree, 

613 symlink, 

614 validate_path_element_default, 

615 validate_path_element_hfs, 

616 validate_path_element_ntfs, 

617 ) 

618 

619 if tree is None: 

620 head = self._repo[b"HEAD"] 

621 if isinstance(head, Tag): 

622 _cls, obj = head.object 

623 head = self._repo.get_object(obj) 

624 from .objects import Commit 

625 

626 assert isinstance(head, Commit) 

627 tree = head.tree 

628 config = self._repo.get_config() 

629 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

630 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

631 validate_path_element = validate_path_element_ntfs 

632 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

633 validate_path_element = validate_path_element_hfs 

634 else: 

635 validate_path_element = validate_path_element_default 

636 if config.get_boolean(b"core", b"symlinks", True): 

637 symlink_fn = symlink 

638 else: 

639 

640 def symlink_fn( 

641 src: Union[str, bytes, os.PathLike], 

642 dst: Union[str, bytes, os.PathLike], 

643 target_is_directory: bool = False, 

644 *, 

645 dir_fd: int | None = None, 

646 ) -> None: 

647 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f: 

648 f.write(src) 

649 

650 blob_normalizer = self._repo.get_blob_normalizer() 

651 return build_index_from_tree( 

652 self.path, 

653 self._repo.index_path(), 

654 self._repo.object_store, 

655 tree, 

656 honor_filemode=honor_filemode, 

657 validate_path_element=validate_path_element, 

658 symlink_fn=symlink_fn, # type: ignore[arg-type] 

659 blob_normalizer=blob_normalizer, 

660 ) 

661 

662 def _sparse_checkout_file_path(self) -> str: 

663 """Return the path of the sparse-checkout file in this repo's control dir.""" 

664 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

665 

666 def configure_for_cone_mode(self) -> None: 

667 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

668 config = self._repo.get_config() 

669 config.set((b"core",), b"sparseCheckout", b"true") 

670 config.set((b"core",), b"sparseCheckoutCone", b"true") 

671 config.write_to_path() 

672 

673 def infer_cone_mode(self) -> bool: 

674 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

675 config = self._repo.get_config() 

676 try: 

677 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

678 return sc_cone == b"true" 

679 except KeyError: 

680 # If core.sparseCheckoutCone is not set, default to False 

681 return False 

682 

683 def get_sparse_checkout_patterns(self) -> list[str]: 

684 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

685 

686 Returns: 

687 A list of patterns. Returns an empty list if the file is missing. 

688 """ 

689 path = self._sparse_checkout_file_path() 

690 try: 

691 with open(path, encoding="utf-8") as f: 

692 return [line.strip() for line in f if line.strip()] 

693 except FileNotFoundError: 

694 return [] 

695 

696 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

697 """Write the given sparse-checkout patterns into info/sparse-checkout. 

698 

699 Creates the info/ directory if it does not exist. 

700 

701 Args: 

702 patterns: A list of gitignore-style patterns to store. 

703 """ 

704 info_dir = os.path.join(self._repo.controldir(), "info") 

705 os.makedirs(info_dir, exist_ok=True) 

706 

707 path = self._sparse_checkout_file_path() 

708 with open(path, "w", encoding="utf-8") as f: 

709 for pat in patterns: 

710 f.write(pat + "\n") 

711 

712 def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None: 

713 """Write the given cone-mode directory patterns into info/sparse-checkout. 

714 

715 For each directory to include, add an inclusion line that "undoes" the prior 

716 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

717 Never add the same line twice. 

718 """ 

719 patterns = ["/*", "!/*/"] 

720 if dirs: 

721 for d in dirs: 

722 d = d.strip("/") 

723 line = f"/{d}/" 

724 if d and line not in patterns: 

725 patterns.append(line) 

726 self.set_sparse_checkout_patterns(patterns) 

727 

728 

729def read_worktree_lock_reason(worktree_path: str) -> str | None: 

730 """Read the lock reason for a worktree. 

731 

732 Args: 

733 worktree_path: Path to the worktree's administrative directory 

734 

735 Returns: 

736 The lock reason if the worktree is locked, None otherwise 

737 """ 

738 locked_path = os.path.join(worktree_path, "locked") 

739 if not os.path.exists(locked_path): 

740 return None 

741 

742 try: 

743 with open(locked_path) as f: 

744 return f.read().strip() 

745 except (FileNotFoundError, PermissionError): 

746 return None 

747 

748 

749def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

750 """List all worktrees for the given repository. 

751 

752 Args: 

753 repo: The repository to list worktrees for 

754 

755 Returns: 

756 A list of WorkTreeInfo objects 

757 """ 

758 worktrees = [] 

759 

760 # Add main worktree 

761 main_wt_info = WorkTreeInfo( 

762 path=repo.path, 

763 head=repo.head(), 

764 bare=repo.bare, 

765 detached=False, 

766 locked=False, 

767 prunable=False, 

768 ) 

769 

770 # Get branch info for main worktree 

771 try: 

772 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

773 head_contents = f.read().strip() 

774 if head_contents.startswith(SYMREF): 

775 ref_name = head_contents[len(SYMREF) :].strip() 

776 main_wt_info.branch = ref_name 

777 else: 

778 main_wt_info.detached = True 

779 main_wt_info.branch = None 

780 except (FileNotFoundError, PermissionError): 

781 main_wt_info.branch = None 

782 main_wt_info.detached = True 

783 

784 worktrees.append(main_wt_info) 

785 

786 # List additional worktrees 

787 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

788 if os.path.isdir(worktrees_dir): 

789 for entry in os.listdir(worktrees_dir): 

790 worktree_path = os.path.join(worktrees_dir, entry) 

791 if not os.path.isdir(worktree_path): 

792 continue 

793 

794 wt_info = WorkTreeInfo( 

795 path="", # Will be set below 

796 bare=False, 

797 detached=False, 

798 locked=False, 

799 prunable=False, 

800 ) 

801 

802 # Read gitdir to get actual worktree path 

803 gitdir_path = os.path.join(worktree_path, GITDIR) 

804 try: 

805 with open(gitdir_path, "rb") as f: 

806 gitdir_contents = f.read().strip() 

807 # Convert relative path to absolute if needed 

808 wt_path = os.fsdecode(gitdir_contents) 

809 if not os.path.isabs(wt_path): 

810 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

811 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

812 except (FileNotFoundError, PermissionError): 

813 # Worktree directory is missing, skip it 

814 # TODO: Consider adding these as prunable worktrees with a placeholder path 

815 continue 

816 

817 # Check if worktree path exists 

818 if wt_info.path and not os.path.exists(wt_info.path): 

819 wt_info.prunable = True 

820 

821 # Read HEAD 

822 head_path = os.path.join(worktree_path, "HEAD") 

823 try: 

824 with open(head_path, "rb") as f: 

825 head_contents = f.read().strip() 

826 if head_contents.startswith(SYMREF): 

827 ref_name = head_contents[len(SYMREF) :].strip() 

828 wt_info.branch = ref_name 

829 # Resolve ref to get commit sha 

830 try: 

831 wt_info.head = repo.refs[ref_name] 

832 except KeyError: 

833 wt_info.head = None 

834 else: 

835 wt_info.detached = True 

836 wt_info.branch = None 

837 wt_info.head = head_contents 

838 except (FileNotFoundError, PermissionError): 

839 wt_info.head = None 

840 wt_info.branch = None 

841 

842 # Check if locked 

843 lock_reason = read_worktree_lock_reason(worktree_path) 

844 if lock_reason is not None: 

845 wt_info.locked = True 

846 wt_info.lock_reason = lock_reason 

847 

848 worktrees.append(wt_info) 

849 

850 return worktrees 

851 

852 

853def add_worktree( 

854 repo: Repo, 

855 path: str | bytes | os.PathLike, 

856 branch: str | bytes | None = None, 

857 commit: ObjectID | None = None, 

858 force: bool = False, 

859 detach: bool = False, 

860 exist_ok: bool = False, 

861) -> Repo: 

862 """Add a new worktree to the repository. 

863 

864 Args: 

865 repo: The main repository 

866 path: Path where the new worktree should be created 

867 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

868 commit: Specific commit to checkout (results in detached HEAD) 

869 force: Force creation even if branch is already checked out elsewhere 

870 detach: Detach HEAD in the new worktree 

871 exist_ok: If True, do not raise an error if the directory already exists 

872 

873 Returns: 

874 The newly created worktree repository 

875 

876 Raises: 

877 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

878 """ 

879 from .repo import Repo as RepoClass 

880 

881 path = os.fspath(path) 

882 if isinstance(path, bytes): 

883 path = os.fsdecode(path) 

884 

885 # Check if path already exists 

886 if os.path.exists(path) and not exist_ok: 

887 raise ValueError(f"Path already exists: {path}") 

888 

889 # Normalize branch name 

890 if branch is not None: 

891 if isinstance(branch, str): 

892 branch = branch.encode() 

893 if not branch.startswith(b"refs/heads/"): 

894 branch = b"refs/heads/" + branch 

895 

896 # Check if branch is already checked out in another worktree 

897 if branch and not force: 

898 for wt in list_worktrees(repo): 

899 if wt.branch == branch: 

900 raise ValueError( 

901 f"Branch {branch.decode()} is already checked out at {wt.path}" 

902 ) 

903 

904 # Determine what to checkout 

905 if commit is not None: 

906 checkout_ref = commit 

907 detach = True 

908 elif branch is not None: 

909 # Check if branch exists 

910 try: 

911 checkout_ref = repo.refs[branch] 

912 except KeyError: 

913 if commit is None: 

914 # Create new branch from HEAD 

915 checkout_ref = repo.head() 

916 repo.refs[branch] = checkout_ref 

917 else: 

918 # Create new branch from specified commit 

919 checkout_ref = commit 

920 repo.refs[branch] = checkout_ref 

921 else: 

922 # Default to current HEAD 

923 checkout_ref = repo.head() 

924 detach = True 

925 

926 # Create the worktree directory 

927 os.makedirs(path, exist_ok=exist_ok) 

928 

929 # Initialize the worktree 

930 identifier = os.path.basename(path) 

931 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier) 

932 

933 # Set HEAD appropriately 

934 if detach: 

935 # Detached HEAD - write SHA directly to HEAD 

936 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

937 f.write(checkout_ref + b"\n") 

938 else: 

939 # Point to branch 

940 wt_repo.refs.set_symbolic_ref(b"HEAD", branch) 

941 

942 # Reset index to match HEAD 

943 wt_repo.get_worktree().reset_index() 

944 

945 return wt_repo 

946 

947 

948def remove_worktree( 

949 repo: Repo, path: str | bytes | os.PathLike, force: bool = False 

950) -> None: 

951 """Remove a worktree. 

952 

953 Args: 

954 repo: The main repository 

955 path: Path to the worktree to remove 

956 force: Force removal even if there are local changes 

957 

958 Raises: 

959 ValueError: If the worktree doesn't exist, has local changes, or is locked 

960 """ 

961 path = os.fspath(path) 

962 if isinstance(path, bytes): 

963 path = os.fsdecode(path) 

964 

965 # Don't allow removing the main worktree 

966 if os.path.abspath(path) == os.path.abspath(repo.path): 

967 raise ValueError("Cannot remove the main working tree") 

968 

969 # Find the worktree 

970 worktree_found = False 

971 worktree_id = None 

972 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

973 

974 if os.path.isdir(worktrees_dir): 

975 for entry in os.listdir(worktrees_dir): 

976 worktree_path = os.path.join(worktrees_dir, entry) 

977 gitdir_path = os.path.join(worktree_path, GITDIR) 

978 

979 try: 

980 with open(gitdir_path, "rb") as f: 

981 gitdir_contents = f.read().strip() 

982 wt_path = os.fsdecode(gitdir_contents) 

983 if not os.path.isabs(wt_path): 

984 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

985 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

986 

987 if os.path.abspath(wt_dir) == os.path.abspath(path): 

988 worktree_found = True 

989 worktree_id = entry 

990 break 

991 except (FileNotFoundError, PermissionError): 

992 continue 

993 

994 if not worktree_found: 

995 raise ValueError(f"Worktree not found: {path}") 

996 

997 assert worktree_id is not None # Should be set if worktree_found is True 

998 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

999 

1000 # Check if locked 

1001 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

1002 if not force: 

1003 raise ValueError(f"Worktree is locked: {path}") 

1004 

1005 # Check for local changes if not forcing 

1006 if not force and os.path.exists(path): 

1007 # TODO: Check for uncommitted changes in the worktree 

1008 pass 

1009 

1010 # Remove the working directory 

1011 if os.path.exists(path): 

1012 shutil.rmtree(path) 

1013 

1014 # Remove the administrative files 

1015 shutil.rmtree(worktree_control_dir) 

1016 

1017 

1018def prune_worktrees( 

1019 repo: Repo, expire: int | None = None, dry_run: bool = False 

1020) -> list[str]: 

1021 """Prune worktree administrative files for missing worktrees. 

1022 

1023 Args: 

1024 repo: The main repository 

1025 expire: Only prune worktrees older than this many seconds 

1026 dry_run: Don't actually remove anything, just report what would be removed 

1027 

1028 Returns: 

1029 List of pruned worktree identifiers 

1030 """ 

1031 pruned: list[str] = [] 

1032 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1033 

1034 if not os.path.isdir(worktrees_dir): 

1035 return pruned 

1036 

1037 current_time = time.time() 

1038 

1039 for entry in os.listdir(worktrees_dir): 

1040 worktree_path = os.path.join(worktrees_dir, entry) 

1041 if not os.path.isdir(worktree_path): 

1042 continue 

1043 

1044 # Skip locked worktrees 

1045 if os.path.exists(os.path.join(worktree_path, "locked")): 

1046 continue 

1047 

1048 should_prune = False 

1049 

1050 # Check if gitdir exists and points to valid location 

1051 gitdir_path = os.path.join(worktree_path, GITDIR) 

1052 try: 

1053 with open(gitdir_path, "rb") as f: 

1054 gitdir_contents = f.read().strip() 

1055 wt_path = os.fsdecode(gitdir_contents) 

1056 if not os.path.isabs(wt_path): 

1057 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1058 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1059 

1060 if not os.path.exists(wt_dir): 

1061 should_prune = True 

1062 except (FileNotFoundError, PermissionError): 

1063 should_prune = True 

1064 

1065 # Check expiry time if specified 

1066 if should_prune and expire is not None: 

1067 stat_info = os.stat(worktree_path) 

1068 age = current_time - stat_info.st_mtime 

1069 if age < expire: 

1070 should_prune = False 

1071 

1072 if should_prune: 

1073 pruned.append(entry) 

1074 if not dry_run: 

1075 shutil.rmtree(worktree_path) 

1076 

1077 return pruned 

1078 

1079 

1080def lock_worktree( 

1081 repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None 

1082) -> None: 

1083 """Lock a worktree to prevent it from being pruned. 

1084 

1085 Args: 

1086 repo: The main repository 

1087 path: Path to the worktree to lock 

1088 reason: Optional reason for locking 

1089 """ 

1090 worktree_id = _find_worktree_id(repo, path) 

1091 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1092 

1093 lock_path = os.path.join(worktree_control_dir, "locked") 

1094 with open(lock_path, "w") as f: 

1095 if reason: 

1096 f.write(reason) 

1097 

1098 

1099def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None: 

1100 """Unlock a worktree. 

1101 

1102 Args: 

1103 repo: The main repository 

1104 path: Path to the worktree to unlock 

1105 """ 

1106 worktree_id = _find_worktree_id(repo, path) 

1107 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1108 

1109 lock_path = os.path.join(worktree_control_dir, "locked") 

1110 if os.path.exists(lock_path): 

1111 os.remove(lock_path) 

1112 

1113 

1114def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str: 

1115 """Find the worktree identifier for the given path. 

1116 

1117 Args: 

1118 repo: The main repository 

1119 path: Path to the worktree 

1120 

1121 Returns: 

1122 The worktree identifier 

1123 

1124 Raises: 

1125 ValueError: If the worktree is not found 

1126 """ 

1127 path = os.fspath(path) 

1128 if isinstance(path, bytes): 

1129 path = os.fsdecode(path) 

1130 

1131 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1132 

1133 if os.path.isdir(worktrees_dir): 

1134 for entry in os.listdir(worktrees_dir): 

1135 worktree_path = os.path.join(worktrees_dir, entry) 

1136 gitdir_path = os.path.join(worktree_path, GITDIR) 

1137 

1138 try: 

1139 with open(gitdir_path, "rb") as f: 

1140 gitdir_contents = f.read().strip() 

1141 wt_path = os.fsdecode(gitdir_contents) 

1142 if not os.path.isabs(wt_path): 

1143 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1144 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1145 

1146 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1147 return entry 

1148 except (FileNotFoundError, PermissionError): 

1149 continue 

1150 

1151 raise ValueError(f"Worktree not found: {path}") 

1152 

1153 

1154def move_worktree( 

1155 repo: Repo, 

1156 old_path: str | bytes | os.PathLike, 

1157 new_path: str | bytes | os.PathLike, 

1158) -> None: 

1159 """Move a worktree to a new location. 

1160 

1161 Args: 

1162 repo: The main repository 

1163 old_path: Current path of the worktree 

1164 new_path: New path for the worktree 

1165 

1166 Raises: 

1167 ValueError: If the worktree doesn't exist or new path already exists 

1168 """ 

1169 old_path = os.fspath(old_path) 

1170 new_path = os.fspath(new_path) 

1171 if isinstance(old_path, bytes): 

1172 old_path = os.fsdecode(old_path) 

1173 if isinstance(new_path, bytes): 

1174 new_path = os.fsdecode(new_path) 

1175 

1176 # Don't allow moving the main worktree 

1177 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1178 raise ValueError("Cannot move the main working tree") 

1179 

1180 # Check if new path already exists 

1181 if os.path.exists(new_path): 

1182 raise ValueError(f"Path already exists: {new_path}") 

1183 

1184 # Find the worktree 

1185 worktree_id = _find_worktree_id(repo, old_path) 

1186 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1187 

1188 # Move the actual worktree directory 

1189 shutil.move(old_path, new_path) 

1190 

1191 # Update the gitdir file in the worktree 

1192 gitdir_file = os.path.join(new_path, ".git") 

1193 

1194 # Update the gitdir pointer in the control directory 

1195 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1196 f.write(os.fsencode(gitdir_file) + b"\n") 

1197 

1198 

1199@contextmanager 

1200def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1201 """Create a temporary worktree that is automatically cleaned up. 

1202 

1203 Args: 

1204 repo: Dulwich repository object 

1205 prefix: Prefix for the temporary directory name 

1206 

1207 Yields: 

1208 Worktree object 

1209 """ 

1210 temp_dir = None 

1211 worktree = None 

1212 

1213 try: 

1214 # Create temporary directory 

1215 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1216 

1217 # Add worktree 

1218 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1219 

1220 yield worktree 

1221 

1222 finally: 

1223 # Clean up worktree registration 

1224 if worktree: 

1225 repo.worktrees.remove(worktree.path) 

1226 

1227 # Clean up temporary directory 

1228 if temp_dir and Path(temp_dir).exists(): 

1229 shutil.rmtree(temp_dir)