Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

543 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26import builtins 

27import os 

28import shutil 

29import stat 

30import sys 

31import tempfile 

32import time 

33import warnings 

34from collections.abc import Iterable, Iterator 

35from contextlib import contextmanager 

36from pathlib import Path 

37 

38from .errors import CommitError, HookError 

39from .objects import Commit, ObjectID, Tag, Tree 

40from .refs import SYMREF, Ref 

41from .repo import ( 

42 GITDIR, 

43 WORKTREES, 

44 Repo, 

45 check_user_identity, 

46 get_user_identity, 

47) 

48 

49 

50class WorkTreeInfo: 

51 """Information about a single worktree. 

52 

53 Attributes: 

54 path: Path to the worktree 

55 head: Current HEAD commit SHA 

56 branch: Current branch (if not detached) 

57 bare: Whether this is a bare repository 

58 detached: Whether HEAD is detached 

59 locked: Whether the worktree is locked 

60 prunable: Whether the worktree can be pruned 

61 lock_reason: Reason for locking (if locked) 

62 """ 

63 

64 def __init__( 

65 self, 

66 path: str, 

67 head: bytes | None = None, 

68 branch: bytes | None = None, 

69 bare: bool = False, 

70 detached: bool = False, 

71 locked: bool = False, 

72 prunable: bool = False, 

73 lock_reason: str | None = None, 

74 ): 

75 self.path = path 

76 self.head = head 

77 self.branch = branch 

78 self.bare = bare 

79 self.detached = detached 

80 self.locked = locked 

81 self.prunable = prunable 

82 self.lock_reason = lock_reason 

83 

84 def __repr__(self) -> str: 

85 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

86 

87 def __eq__(self, other: object) -> bool: 

88 if not isinstance(other, WorkTreeInfo): 

89 return NotImplemented 

90 return ( 

91 self.path == other.path 

92 and self.head == other.head 

93 and self.branch == other.branch 

94 and self.bare == other.bare 

95 and self.detached == other.detached 

96 and self.locked == other.locked 

97 and self.prunable == other.prunable 

98 and self.lock_reason == other.lock_reason 

99 ) 

100 

101 def open(self) -> WorkTree: 

102 """Open this worktree as a WorkTree. 

103 

104 Returns: 

105 WorkTree object for this worktree 

106 

107 Raises: 

108 NotGitRepository: If the worktree path is invalid 

109 """ 

110 from .repo import Repo 

111 

112 repo = Repo(self.path) 

113 return WorkTree(repo, self.path) 

114 

115 

116class WorkTreeContainer: 

117 """Container for managing multiple working trees. 

118 

119 This class manages worktrees for a repository, similar to how 

120 RefsContainer manages references. 

121 """ 

122 

123 def __init__(self, repo: Repo) -> None: 

124 """Initialize a WorkTreeContainer for the given repository. 

125 

126 Args: 

127 repo: The repository this container belongs to 

128 """ 

129 self._repo = repo 

130 

131 def list(self) -> list[WorkTreeInfo]: 

132 """List all worktrees for this repository. 

133 

134 Returns: 

135 A list of WorkTreeInfo objects 

136 """ 

137 return list_worktrees(self._repo) 

138 

139 def add( 

140 self, 

141 path: str | bytes | os.PathLike, 

142 branch: str | bytes | None = None, 

143 commit: ObjectID | None = None, 

144 force: bool = False, 

145 detach: bool = False, 

146 exist_ok: bool = False, 

147 ) -> Repo: 

148 """Add a new worktree. 

149 

150 Args: 

151 path: Path where the new worktree should be created 

152 branch: Branch to checkout in the new worktree 

153 commit: Specific commit to checkout (results in detached HEAD) 

154 force: Force creation even if branch is already checked out elsewhere 

155 detach: Detach HEAD in the new worktree 

156 exist_ok: If True, do not raise an error if the directory already exists 

157 

158 Returns: 

159 The newly created worktree repository 

160 """ 

161 return add_worktree( 

162 self._repo, 

163 path, 

164 branch=branch, 

165 commit=commit, 

166 force=force, 

167 detach=detach, 

168 exist_ok=exist_ok, 

169 ) 

170 

171 def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None: 

172 """Remove a worktree. 

173 

174 Args: 

175 path: Path to the worktree to remove 

176 force: Force removal even if there are local changes 

177 """ 

178 remove_worktree(self._repo, path, force=force) 

179 

180 def prune( 

181 self, expire: int | None = None, dry_run: bool = False 

182 ) -> builtins.list[str]: 

183 """Prune worktree administrative files for missing worktrees. 

184 

185 Args: 

186 expire: Only prune worktrees older than this many seconds 

187 dry_run: Don't actually remove anything, just report what would be removed 

188 

189 Returns: 

190 List of pruned worktree identifiers 

191 """ 

192 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

193 

194 def move( 

195 self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike 

196 ) -> None: 

197 """Move a worktree to a new location. 

198 

199 Args: 

200 old_path: Current path of the worktree 

201 new_path: New path for the worktree 

202 """ 

203 move_worktree(self._repo, old_path, new_path) 

204 

205 def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None: 

206 """Lock a worktree to prevent it from being pruned. 

207 

208 Args: 

209 path: Path to the worktree to lock 

210 reason: Optional reason for locking 

211 """ 

212 lock_worktree(self._repo, path, reason=reason) 

213 

214 def unlock(self, path: str | bytes | os.PathLike) -> None: 

215 """Unlock a worktree. 

216 

217 Args: 

218 path: Path to the worktree to unlock 

219 """ 

220 unlock_worktree(self._repo, path) 

221 

222 def __iter__(self) -> Iterator[WorkTreeInfo]: 

223 """Iterate over all worktrees.""" 

224 yield from self.list() 

225 

226 

227class WorkTree: 

228 """Working tree operations for a Git repository. 

229 

230 This class provides methods for working with the working tree, 

231 such as staging files, committing changes, and resetting the index. 

232 """ 

233 

234 def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None: 

235 """Initialize a WorkTree for the given repository. 

236 

237 Args: 

238 repo: The repository this working tree belongs to 

239 path: Path to the working tree directory 

240 """ 

241 self._repo = repo 

242 raw_path = os.fspath(path) 

243 if isinstance(raw_path, bytes): 

244 self.path: str = os.fsdecode(raw_path) 

245 else: 

246 self.path = raw_path 

247 self.path = os.path.abspath(self.path) 

248 

249 def stage( 

250 self, 

251 fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike], 

252 ) -> None: 

253 """Stage a set of paths. 

254 

255 Args: 

256 fs_paths: List of paths, relative to the repository path 

257 """ 

258 root_path_bytes = os.fsencode(self.path) 

259 

260 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

261 fs_paths = [fs_paths] 

262 fs_paths = list(fs_paths) 

263 

264 from .index import ( 

265 _fs_to_tree_path, 

266 blob_from_path_and_stat, 

267 index_entry_from_directory, 

268 index_entry_from_stat, 

269 ) 

270 

271 index = self._repo.open_index() 

272 blob_normalizer = self._repo.get_blob_normalizer() 

273 for fs_path in fs_paths: 

274 if not isinstance(fs_path, bytes): 

275 fs_path = os.fsencode(fs_path) 

276 if os.path.isabs(fs_path): 

277 raise ValueError( 

278 f"path {fs_path!r} should be relative to " 

279 "repository root, not absolute" 

280 ) 

281 tree_path = _fs_to_tree_path(fs_path) 

282 full_path = os.path.join(root_path_bytes, fs_path) 

283 try: 

284 st = os.lstat(full_path) 

285 except (FileNotFoundError, NotADirectoryError): 

286 # File no longer exists 

287 try: 

288 del index[tree_path] 

289 except KeyError: 

290 pass # already removed 

291 else: 

292 if stat.S_ISDIR(st.st_mode): 

293 entry = index_entry_from_directory(st, full_path) 

294 if entry: 

295 index[tree_path] = entry 

296 else: 

297 try: 

298 del index[tree_path] 

299 except KeyError: 

300 pass 

301 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

302 try: 

303 del index[tree_path] 

304 except KeyError: 

305 pass 

306 else: 

307 blob = blob_from_path_and_stat(full_path, st) 

308 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

309 self._repo.object_store.add_object(blob) 

310 index[tree_path] = index_entry_from_stat(st, blob.id) 

311 index.write() 

312 

313 def unstage(self, fs_paths: list[str]) -> None: 

314 """Unstage specific file in the index 

315 Args: 

316 fs_paths: a list of files to unstage, 

317 relative to the repository path. 

318 """ 

319 from .index import IndexEntry, _fs_to_tree_path 

320 

321 index = self._repo.open_index() 

322 try: 

323 tree_id = self._repo[b"HEAD"].tree 

324 except KeyError: 

325 # no head mean no commit in the repo 

326 for fs_path in fs_paths: 

327 tree_path = _fs_to_tree_path(fs_path) 

328 del index[tree_path] 

329 index.write() 

330 return 

331 

332 for fs_path in fs_paths: 

333 tree_path = _fs_to_tree_path(fs_path) 

334 try: 

335 tree = self._repo.object_store[tree_id] 

336 assert isinstance(tree, Tree) 

337 tree_entry = tree.lookup_path( 

338 self._repo.object_store.__getitem__, tree_path 

339 ) 

340 except KeyError: 

341 # if tree_entry didn't exist, this file was being added, so 

342 # remove index entry 

343 try: 

344 del index[tree_path] 

345 continue 

346 except KeyError as exc: 

347 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

348 

349 st = None 

350 try: 

351 st = os.lstat(os.path.join(self.path, fs_path)) 

352 except FileNotFoundError: 

353 pass 

354 

355 index_entry = IndexEntry( 

356 ctime=(self._repo[b"HEAD"].commit_time, 0), 

357 mtime=(self._repo[b"HEAD"].commit_time, 0), 

358 dev=st.st_dev if st else 0, 

359 ino=st.st_ino if st else 0, 

360 mode=tree_entry[0], 

361 uid=st.st_uid if st else 0, 

362 gid=st.st_gid if st else 0, 

363 size=len(self._repo[tree_entry[1]].data), 

364 sha=tree_entry[1], 

365 flags=0, 

366 extended_flags=0, 

367 ) 

368 

369 index[tree_path] = index_entry 

370 index.write() 

371 

372 def commit( 

373 self, 

374 message: bytes | None = None, 

375 committer: bytes | None = None, 

376 author: bytes | None = None, 

377 commit_timestamp: float | None = None, 

378 commit_timezone: int | None = None, 

379 author_timestamp: float | None = None, 

380 author_timezone: int | None = None, 

381 tree: ObjectID | None = None, 

382 encoding: bytes | None = None, 

383 ref: Ref | None = b"HEAD", 

384 merge_heads: list[ObjectID] | None = None, 

385 no_verify: bool = False, 

386 sign: bool = False, 

387 ) -> ObjectID: 

388 """Create a new commit. 

389 

390 If not specified, committer and author default to 

391 get_user_identity(..., 'COMMITTER') 

392 and get_user_identity(..., 'AUTHOR') respectively. 

393 

394 Args: 

395 message: Commit message (bytes or callable that takes (repo, commit) 

396 and returns bytes) 

397 committer: Committer fullname 

398 author: Author fullname 

399 commit_timestamp: Commit timestamp (defaults to now) 

400 commit_timezone: Commit timestamp timezone (defaults to GMT) 

401 author_timestamp: Author timestamp (defaults to commit 

402 timestamp) 

403 author_timezone: Author timestamp timezone 

404 (defaults to commit timestamp timezone) 

405 tree: SHA1 of the tree root to use (if not specified the 

406 current index will be committed). 

407 encoding: Encoding 

408 ref: Optional ref to commit to (defaults to current branch). 

409 If None, creates a dangling commit without updating any ref. 

410 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

411 no_verify: Skip pre-commit and commit-msg hooks 

412 sign: GPG Sign the commit (bool, defaults to False, 

413 pass True to use default GPG key, 

414 pass a str containing Key ID to use a specific GPG key) 

415 

416 Returns: 

417 New commit SHA1 

418 """ 

419 try: 

420 if not no_verify: 

421 self._repo.hooks["pre-commit"].execute() 

422 except HookError as exc: 

423 raise CommitError(exc) from exc 

424 except KeyError: # no hook defined, silent fallthrough 

425 pass 

426 

427 c = Commit() 

428 if tree is None: 

429 index = self._repo.open_index() 

430 c.tree = index.commit(self._repo.object_store) 

431 else: 

432 if len(tree) != 40: 

433 raise ValueError("tree must be a 40-byte hex sha string") 

434 c.tree = tree 

435 

436 config = self._repo.get_config_stack() 

437 if merge_heads is None: 

438 merge_heads = self._repo._read_heads("MERGE_HEAD") 

439 if committer is None: 

440 committer = get_user_identity(config, kind="COMMITTER") 

441 check_user_identity(committer) 

442 c.committer = committer 

443 if commit_timestamp is None: 

444 # FIXME: Support GIT_COMMITTER_DATE environment variable 

445 commit_timestamp = time.time() 

446 c.commit_time = int(commit_timestamp) 

447 if commit_timezone is None: 

448 # FIXME: Use current user timezone rather than UTC 

449 commit_timezone = 0 

450 c.commit_timezone = commit_timezone 

451 if author is None: 

452 author = get_user_identity(config, kind="AUTHOR") 

453 c.author = author 

454 check_user_identity(author) 

455 if author_timestamp is None: 

456 # FIXME: Support GIT_AUTHOR_DATE environment variable 

457 author_timestamp = commit_timestamp 

458 c.author_time = int(author_timestamp) 

459 if author_timezone is None: 

460 author_timezone = commit_timezone 

461 c.author_timezone = author_timezone 

462 if encoding is None: 

463 try: 

464 encoding = config.get(("i18n",), "commitEncoding") 

465 except KeyError: 

466 pass # No dice 

467 if encoding is not None: 

468 c.encoding = encoding 

469 # Store original message (might be callable) 

470 original_message = message 

471 message = None # Will be set later after parents are set 

472 

473 # Check if we should sign the commit 

474 should_sign = sign 

475 if sign is None: 

476 # Check commit.gpgSign configuration when sign is not explicitly set 

477 config = self._repo.get_config_stack() 

478 try: 

479 should_sign = config.get_boolean((b"commit",), b"gpgSign") 

480 except KeyError: 

481 should_sign = False # Default to not signing if no config 

482 keyid = sign if isinstance(sign, str) else None 

483 

484 if ref is None: 

485 # Create a dangling commit 

486 c.parents = merge_heads 

487 else: 

488 try: 

489 old_head = self._repo.refs[ref] 

490 c.parents = [old_head, *merge_heads] 

491 except KeyError: 

492 c.parents = merge_heads 

493 

494 # Handle message after parents are set 

495 if callable(original_message): 

496 message = original_message(self._repo, c) 

497 if message is None: 

498 raise ValueError("Message callback returned None") 

499 else: 

500 message = original_message 

501 

502 if message is None: 

503 # FIXME: Try to read commit message from .git/MERGE_MSG 

504 raise ValueError("No commit message specified") 

505 

506 try: 

507 if no_verify: 

508 c.message = message 

509 else: 

510 c.message = self._repo.hooks["commit-msg"].execute(message) 

511 if c.message is None: 

512 c.message = message 

513 except HookError as exc: 

514 raise CommitError(exc) from exc 

515 except KeyError: # no hook defined, message not modified 

516 c.message = message 

517 

518 if ref is None: 

519 # Create a dangling commit 

520 if should_sign: 

521 c.sign(keyid) 

522 self._repo.object_store.add_object(c) 

523 else: 

524 try: 

525 old_head = self._repo.refs[ref] 

526 if should_sign: 

527 c.sign(keyid) 

528 self._repo.object_store.add_object(c) 

529 ok = self._repo.refs.set_if_equals( 

530 ref, 

531 old_head, 

532 c.id, 

533 message=b"commit: " + message, 

534 committer=committer, 

535 timestamp=commit_timestamp, 

536 timezone=commit_timezone, 

537 ) 

538 except KeyError: 

539 c.parents = merge_heads 

540 if should_sign: 

541 c.sign(keyid) 

542 self._repo.object_store.add_object(c) 

543 ok = self._repo.refs.add_if_new( 

544 ref, 

545 c.id, 

546 message=b"commit: " + message, 

547 committer=committer, 

548 timestamp=commit_timestamp, 

549 timezone=commit_timezone, 

550 ) 

551 if not ok: 

552 # Fail if the atomic compare-and-swap failed, leaving the 

553 # commit and all its objects as garbage. 

554 raise CommitError(f"{ref!r} changed during commit") 

555 

556 self._repo._del_named_file("MERGE_HEAD") 

557 

558 try: 

559 self._repo.hooks["post-commit"].execute() 

560 except HookError as e: # silent failure 

561 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

562 except KeyError: # no hook defined, silent fallthrough 

563 pass 

564 

565 # Trigger auto GC if needed 

566 from .gc import maybe_auto_gc 

567 

568 maybe_auto_gc(self._repo) 

569 

570 return c.id 

571 

572 def reset_index(self, tree: bytes | None = None) -> None: 

573 """Reset the index back to a specific tree. 

574 

575 Args: 

576 tree: Tree SHA to reset to, None for current HEAD tree. 

577 """ 

578 from .index import ( 

579 build_index_from_tree, 

580 symlink, 

581 validate_path_element_default, 

582 validate_path_element_hfs, 

583 validate_path_element_ntfs, 

584 ) 

585 

586 if tree is None: 

587 head = self._repo[b"HEAD"] 

588 if isinstance(head, Tag): 

589 _cls, obj = head.object 

590 head = self._repo.get_object(obj) 

591 tree = head.tree 

592 config = self._repo.get_config() 

593 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

594 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

595 validate_path_element = validate_path_element_ntfs 

596 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

597 validate_path_element = validate_path_element_hfs 

598 else: 

599 validate_path_element = validate_path_element_default 

600 if config.get_boolean(b"core", b"symlinks", True): 

601 symlink_fn = symlink 

602 else: 

603 

604 def symlink_fn(source, target) -> None: # type: ignore 

605 with open( 

606 target, "w" + ("b" if isinstance(source, bytes) else "") 

607 ) as f: 

608 f.write(source) 

609 

610 blob_normalizer = self._repo.get_blob_normalizer() 

611 return build_index_from_tree( 

612 self.path, 

613 self._repo.index_path(), 

614 self._repo.object_store, 

615 tree, 

616 honor_filemode=honor_filemode, 

617 validate_path_element=validate_path_element, 

618 symlink_fn=symlink_fn, 

619 blob_normalizer=blob_normalizer, 

620 ) 

621 

622 def _sparse_checkout_file_path(self) -> str: 

623 """Return the path of the sparse-checkout file in this repo's control dir.""" 

624 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

625 

626 def configure_for_cone_mode(self) -> None: 

627 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

628 config = self._repo.get_config() 

629 config.set((b"core",), b"sparseCheckout", b"true") 

630 config.set((b"core",), b"sparseCheckoutCone", b"true") 

631 config.write_to_path() 

632 

633 def infer_cone_mode(self) -> bool: 

634 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

635 config = self._repo.get_config() 

636 try: 

637 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

638 return sc_cone == b"true" 

639 except KeyError: 

640 # If core.sparseCheckoutCone is not set, default to False 

641 return False 

642 

643 def get_sparse_checkout_patterns(self) -> list[str]: 

644 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

645 

646 Returns: 

647 A list of patterns. Returns an empty list if the file is missing. 

648 """ 

649 path = self._sparse_checkout_file_path() 

650 try: 

651 with open(path, encoding="utf-8") as f: 

652 return [line.strip() for line in f if line.strip()] 

653 except FileNotFoundError: 

654 return [] 

655 

656 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

657 """Write the given sparse-checkout patterns into info/sparse-checkout. 

658 

659 Creates the info/ directory if it does not exist. 

660 

661 Args: 

662 patterns: A list of gitignore-style patterns to store. 

663 """ 

664 info_dir = os.path.join(self._repo.controldir(), "info") 

665 os.makedirs(info_dir, exist_ok=True) 

666 

667 path = self._sparse_checkout_file_path() 

668 with open(path, "w", encoding="utf-8") as f: 

669 for pat in patterns: 

670 f.write(pat + "\n") 

671 

672 def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None: 

673 """Write the given cone-mode directory patterns into info/sparse-checkout. 

674 

675 For each directory to include, add an inclusion line that "undoes" the prior 

676 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

677 Never add the same line twice. 

678 """ 

679 patterns = ["/*", "!/*/"] 

680 if dirs: 

681 for d in dirs: 

682 d = d.strip("/") 

683 line = f"/{d}/" 

684 if d and line not in patterns: 

685 patterns.append(line) 

686 self.set_sparse_checkout_patterns(patterns) 

687 

688 

689def read_worktree_lock_reason(worktree_path: str) -> str | None: 

690 """Read the lock reason for a worktree. 

691 

692 Args: 

693 worktree_path: Path to the worktree's administrative directory 

694 

695 Returns: 

696 The lock reason if the worktree is locked, None otherwise 

697 """ 

698 locked_path = os.path.join(worktree_path, "locked") 

699 if not os.path.exists(locked_path): 

700 return None 

701 

702 try: 

703 with open(locked_path) as f: 

704 return f.read().strip() 

705 except (FileNotFoundError, PermissionError): 

706 return None 

707 

708 

709def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

710 """List all worktrees for the given repository. 

711 

712 Args: 

713 repo: The repository to list worktrees for 

714 

715 Returns: 

716 A list of WorkTreeInfo objects 

717 """ 

718 worktrees = [] 

719 

720 # Add main worktree 

721 main_wt_info = WorkTreeInfo( 

722 path=repo.path, 

723 head=repo.head(), 

724 bare=repo.bare, 

725 detached=False, 

726 locked=False, 

727 prunable=False, 

728 ) 

729 

730 # Get branch info for main worktree 

731 try: 

732 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

733 head_contents = f.read().strip() 

734 if head_contents.startswith(SYMREF): 

735 ref_name = head_contents[len(SYMREF) :].strip() 

736 main_wt_info.branch = ref_name 

737 else: 

738 main_wt_info.detached = True 

739 main_wt_info.branch = None 

740 except (FileNotFoundError, PermissionError): 

741 main_wt_info.branch = None 

742 main_wt_info.detached = True 

743 

744 worktrees.append(main_wt_info) 

745 

746 # List additional worktrees 

747 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

748 if os.path.isdir(worktrees_dir): 

749 for entry in os.listdir(worktrees_dir): 

750 worktree_path = os.path.join(worktrees_dir, entry) 

751 if not os.path.isdir(worktree_path): 

752 continue 

753 

754 wt_info = WorkTreeInfo( 

755 path="", # Will be set below 

756 bare=False, 

757 detached=False, 

758 locked=False, 

759 prunable=False, 

760 ) 

761 

762 # Read gitdir to get actual worktree path 

763 gitdir_path = os.path.join(worktree_path, GITDIR) 

764 try: 

765 with open(gitdir_path, "rb") as f: 

766 gitdir_contents = f.read().strip() 

767 # Convert relative path to absolute if needed 

768 wt_path = os.fsdecode(gitdir_contents) 

769 if not os.path.isabs(wt_path): 

770 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

771 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

772 except (FileNotFoundError, PermissionError): 

773 # Worktree directory is missing, skip it 

774 # TODO: Consider adding these as prunable worktrees with a placeholder path 

775 continue 

776 

777 # Check if worktree path exists 

778 if wt_info.path and not os.path.exists(wt_info.path): 

779 wt_info.prunable = True 

780 

781 # Read HEAD 

782 head_path = os.path.join(worktree_path, "HEAD") 

783 try: 

784 with open(head_path, "rb") as f: 

785 head_contents = f.read().strip() 

786 if head_contents.startswith(SYMREF): 

787 ref_name = head_contents[len(SYMREF) :].strip() 

788 wt_info.branch = ref_name 

789 # Resolve ref to get commit sha 

790 try: 

791 wt_info.head = repo.refs[ref_name] 

792 except KeyError: 

793 wt_info.head = None 

794 else: 

795 wt_info.detached = True 

796 wt_info.branch = None 

797 wt_info.head = head_contents 

798 except (FileNotFoundError, PermissionError): 

799 wt_info.head = None 

800 wt_info.branch = None 

801 

802 # Check if locked 

803 lock_reason = read_worktree_lock_reason(worktree_path) 

804 if lock_reason is not None: 

805 wt_info.locked = True 

806 wt_info.lock_reason = lock_reason 

807 

808 worktrees.append(wt_info) 

809 

810 return worktrees 

811 

812 

813def add_worktree( 

814 repo: Repo, 

815 path: str | bytes | os.PathLike, 

816 branch: str | bytes | None = None, 

817 commit: ObjectID | None = None, 

818 force: bool = False, 

819 detach: bool = False, 

820 exist_ok: bool = False, 

821) -> Repo: 

822 """Add a new worktree to the repository. 

823 

824 Args: 

825 repo: The main repository 

826 path: Path where the new worktree should be created 

827 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

828 commit: Specific commit to checkout (results in detached HEAD) 

829 force: Force creation even if branch is already checked out elsewhere 

830 detach: Detach HEAD in the new worktree 

831 exist_ok: If True, do not raise an error if the directory already exists 

832 

833 Returns: 

834 The newly created worktree repository 

835 

836 Raises: 

837 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

838 """ 

839 from .repo import Repo as RepoClass 

840 

841 path = os.fspath(path) 

842 if isinstance(path, bytes): 

843 path = os.fsdecode(path) 

844 

845 # Check if path already exists 

846 if os.path.exists(path) and not exist_ok: 

847 raise ValueError(f"Path already exists: {path}") 

848 

849 # Normalize branch name 

850 if branch is not None: 

851 if isinstance(branch, str): 

852 branch = branch.encode() 

853 if not branch.startswith(b"refs/heads/"): 

854 branch = b"refs/heads/" + branch 

855 

856 # Check if branch is already checked out in another worktree 

857 if branch and not force: 

858 for wt in list_worktrees(repo): 

859 if wt.branch == branch: 

860 raise ValueError( 

861 f"Branch {branch.decode()} is already checked out at {wt.path}" 

862 ) 

863 

864 # Determine what to checkout 

865 if commit is not None: 

866 checkout_ref = commit 

867 detach = True 

868 elif branch is not None: 

869 # Check if branch exists 

870 try: 

871 checkout_ref = repo.refs[branch] 

872 except KeyError: 

873 if commit is None: 

874 # Create new branch from HEAD 

875 checkout_ref = repo.head() 

876 repo.refs[branch] = checkout_ref 

877 else: 

878 # Create new branch from specified commit 

879 checkout_ref = commit 

880 repo.refs[branch] = checkout_ref 

881 else: 

882 # Default to current HEAD 

883 checkout_ref = repo.head() 

884 detach = True 

885 

886 # Create the worktree directory 

887 os.makedirs(path, exist_ok=exist_ok) 

888 

889 # Initialize the worktree 

890 identifier = os.path.basename(path) 

891 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier) 

892 

893 # Set HEAD appropriately 

894 if detach: 

895 # Detached HEAD - write SHA directly to HEAD 

896 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

897 f.write(checkout_ref + b"\n") 

898 else: 

899 # Point to branch 

900 wt_repo.refs.set_symbolic_ref(b"HEAD", branch) 

901 

902 # Reset index to match HEAD 

903 wt_repo.get_worktree().reset_index() 

904 

905 return wt_repo 

906 

907 

908def remove_worktree( 

909 repo: Repo, path: str | bytes | os.PathLike, force: bool = False 

910) -> None: 

911 """Remove a worktree. 

912 

913 Args: 

914 repo: The main repository 

915 path: Path to the worktree to remove 

916 force: Force removal even if there are local changes 

917 

918 Raises: 

919 ValueError: If the worktree doesn't exist, has local changes, or is locked 

920 """ 

921 path = os.fspath(path) 

922 if isinstance(path, bytes): 

923 path = os.fsdecode(path) 

924 

925 # Don't allow removing the main worktree 

926 if os.path.abspath(path) == os.path.abspath(repo.path): 

927 raise ValueError("Cannot remove the main working tree") 

928 

929 # Find the worktree 

930 worktree_found = False 

931 worktree_id = None 

932 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

933 

934 if os.path.isdir(worktrees_dir): 

935 for entry in os.listdir(worktrees_dir): 

936 worktree_path = os.path.join(worktrees_dir, entry) 

937 gitdir_path = os.path.join(worktree_path, GITDIR) 

938 

939 try: 

940 with open(gitdir_path, "rb") as f: 

941 gitdir_contents = f.read().strip() 

942 wt_path = os.fsdecode(gitdir_contents) 

943 if not os.path.isabs(wt_path): 

944 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

945 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

946 

947 if os.path.abspath(wt_dir) == os.path.abspath(path): 

948 worktree_found = True 

949 worktree_id = entry 

950 break 

951 except (FileNotFoundError, PermissionError): 

952 continue 

953 

954 if not worktree_found: 

955 raise ValueError(f"Worktree not found: {path}") 

956 

957 assert worktree_id is not None # Should be set if worktree_found is True 

958 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

959 

960 # Check if locked 

961 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

962 if not force: 

963 raise ValueError(f"Worktree is locked: {path}") 

964 

965 # Check for local changes if not forcing 

966 if not force and os.path.exists(path): 

967 # TODO: Check for uncommitted changes in the worktree 

968 pass 

969 

970 # Remove the working directory 

971 if os.path.exists(path): 

972 shutil.rmtree(path) 

973 

974 # Remove the administrative files 

975 shutil.rmtree(worktree_control_dir) 

976 

977 

978def prune_worktrees( 

979 repo: Repo, expire: int | None = None, dry_run: bool = False 

980) -> list[str]: 

981 """Prune worktree administrative files for missing worktrees. 

982 

983 Args: 

984 repo: The main repository 

985 expire: Only prune worktrees older than this many seconds 

986 dry_run: Don't actually remove anything, just report what would be removed 

987 

988 Returns: 

989 List of pruned worktree identifiers 

990 """ 

991 pruned: list[str] = [] 

992 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

993 

994 if not os.path.isdir(worktrees_dir): 

995 return pruned 

996 

997 current_time = time.time() 

998 

999 for entry in os.listdir(worktrees_dir): 

1000 worktree_path = os.path.join(worktrees_dir, entry) 

1001 if not os.path.isdir(worktree_path): 

1002 continue 

1003 

1004 # Skip locked worktrees 

1005 if os.path.exists(os.path.join(worktree_path, "locked")): 

1006 continue 

1007 

1008 should_prune = False 

1009 

1010 # Check if gitdir exists and points to valid location 

1011 gitdir_path = os.path.join(worktree_path, GITDIR) 

1012 try: 

1013 with open(gitdir_path, "rb") as f: 

1014 gitdir_contents = f.read().strip() 

1015 wt_path = os.fsdecode(gitdir_contents) 

1016 if not os.path.isabs(wt_path): 

1017 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1018 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1019 

1020 if not os.path.exists(wt_dir): 

1021 should_prune = True 

1022 except (FileNotFoundError, PermissionError): 

1023 should_prune = True 

1024 

1025 # Check expiry time if specified 

1026 if should_prune and expire is not None: 

1027 stat_info = os.stat(worktree_path) 

1028 age = current_time - stat_info.st_mtime 

1029 if age < expire: 

1030 should_prune = False 

1031 

1032 if should_prune: 

1033 pruned.append(entry) 

1034 if not dry_run: 

1035 shutil.rmtree(worktree_path) 

1036 

1037 return pruned 

1038 

1039 

1040def lock_worktree( 

1041 repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None 

1042) -> None: 

1043 """Lock a worktree to prevent it from being pruned. 

1044 

1045 Args: 

1046 repo: The main repository 

1047 path: Path to the worktree to lock 

1048 reason: Optional reason for locking 

1049 """ 

1050 worktree_id = _find_worktree_id(repo, path) 

1051 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1052 

1053 lock_path = os.path.join(worktree_control_dir, "locked") 

1054 with open(lock_path, "w") as f: 

1055 if reason: 

1056 f.write(reason) 

1057 

1058 

1059def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None: 

1060 """Unlock a worktree. 

1061 

1062 Args: 

1063 repo: The main repository 

1064 path: Path to the worktree to unlock 

1065 """ 

1066 worktree_id = _find_worktree_id(repo, path) 

1067 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1068 

1069 lock_path = os.path.join(worktree_control_dir, "locked") 

1070 if os.path.exists(lock_path): 

1071 os.remove(lock_path) 

1072 

1073 

1074def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str: 

1075 """Find the worktree identifier for the given path. 

1076 

1077 Args: 

1078 repo: The main repository 

1079 path: Path to the worktree 

1080 

1081 Returns: 

1082 The worktree identifier 

1083 

1084 Raises: 

1085 ValueError: If the worktree is not found 

1086 """ 

1087 path = os.fspath(path) 

1088 if isinstance(path, bytes): 

1089 path = os.fsdecode(path) 

1090 

1091 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1092 

1093 if os.path.isdir(worktrees_dir): 

1094 for entry in os.listdir(worktrees_dir): 

1095 worktree_path = os.path.join(worktrees_dir, entry) 

1096 gitdir_path = os.path.join(worktree_path, GITDIR) 

1097 

1098 try: 

1099 with open(gitdir_path, "rb") as f: 

1100 gitdir_contents = f.read().strip() 

1101 wt_path = os.fsdecode(gitdir_contents) 

1102 if not os.path.isabs(wt_path): 

1103 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1104 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1105 

1106 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1107 return entry 

1108 except (FileNotFoundError, PermissionError): 

1109 continue 

1110 

1111 raise ValueError(f"Worktree not found: {path}") 

1112 

1113 

1114def move_worktree( 

1115 repo: Repo, 

1116 old_path: str | bytes | os.PathLike, 

1117 new_path: str | bytes | os.PathLike, 

1118) -> None: 

1119 """Move a worktree to a new location. 

1120 

1121 Args: 

1122 repo: The main repository 

1123 old_path: Current path of the worktree 

1124 new_path: New path for the worktree 

1125 

1126 Raises: 

1127 ValueError: If the worktree doesn't exist or new path already exists 

1128 """ 

1129 old_path = os.fspath(old_path) 

1130 new_path = os.fspath(new_path) 

1131 if isinstance(old_path, bytes): 

1132 old_path = os.fsdecode(old_path) 

1133 if isinstance(new_path, bytes): 

1134 new_path = os.fsdecode(new_path) 

1135 

1136 # Don't allow moving the main worktree 

1137 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1138 raise ValueError("Cannot move the main working tree") 

1139 

1140 # Check if new path already exists 

1141 if os.path.exists(new_path): 

1142 raise ValueError(f"Path already exists: {new_path}") 

1143 

1144 # Find the worktree 

1145 worktree_id = _find_worktree_id(repo, old_path) 

1146 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1147 

1148 # Move the actual worktree directory 

1149 shutil.move(old_path, new_path) 

1150 

1151 # Update the gitdir file in the worktree 

1152 gitdir_file = os.path.join(new_path, ".git") 

1153 

1154 # Update the gitdir pointer in the control directory 

1155 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1156 f.write(os.fsencode(gitdir_file) + b"\n") 

1157 

1158 

1159@contextmanager 

1160def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1161 """Create a temporary worktree that is automatically cleaned up. 

1162 

1163 Args: 

1164 repo: Dulwich repository object 

1165 prefix: Prefix for the temporary directory name 

1166 

1167 Yields: 

1168 Worktree object 

1169 """ 

1170 temp_dir = None 

1171 worktree = None 

1172 

1173 try: 

1174 # Create temporary directory 

1175 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1176 

1177 # Add worktree 

1178 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1179 

1180 yield worktree 

1181 

1182 finally: 

1183 # Clean up worktree registration 

1184 if worktree: 

1185 repo.worktrees.remove(worktree.path) 

1186 

1187 # Clean up temporary directory 

1188 if temp_dir and Path(temp_dir).exists(): 

1189 shutil.rmtree(temp_dir)