Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

687 statements  

1# worktree.py -- Working tree operations for Git repositories 

2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Working tree operations for Git repositories.""" 

23 

24from __future__ import annotations 

25 

26__all__ = [ 

27 "WorkTree", 

28 "WorkTreeContainer", 

29 "WorkTreeInfo", 

30 "add_worktree", 

31 "list_worktrees", 

32 "lock_worktree", 

33 "move_worktree", 

34 "prune_worktrees", 

35 "read_worktree_lock_reason", 

36 "remove_worktree", 

37 "repair_worktree", 

38 "temporary_worktree", 

39 "unlock_worktree", 

40] 

41 

42import builtins 

43import os 

44import shutil 

45import stat 

46import sys 

47import tempfile 

48import time 

49import warnings 

50from collections.abc import Callable, Iterable, Iterator, Sequence 

51from contextlib import contextmanager 

52from pathlib import Path 

53from typing import TYPE_CHECKING, Any 

54 

55from .errors import CommitError, HookError 

56from .objects import Blob, Commit, ObjectID, Tag, Tree 

57 

58if TYPE_CHECKING: 

59 from .config import Config 

60from .refs import SYMREF, Ref, local_branch_name 

61from .repo import ( 

62 GITDIR, 

63 WORKTREES, 

64 Repo, 

65 check_user_identity, 

66 get_user_identity, 

67) 

68from .trailers import add_trailer_to_message 

69 

70 

71def _should_use_relative_paths( 

72 repo: Repo, 

73 relative_paths: bool | None, 

74 existing_path: bytes | None = None, 

75) -> bool: 

76 """Determine whether to use relative paths for gitdir references. 

77 

78 Args: 

79 repo: The repository 

80 relative_paths: Explicit preference (True/False) or None to check config 

81 existing_path: Optional existing path to check format (for preserving format) 

82 

83 Returns: 

84 True if relative paths should be used, False otherwise 

85 """ 

86 if relative_paths is not None: 

87 return relative_paths 

88 

89 # Check config 

90 config = repo.get_config() 

91 try: 

92 use_relative = config.get_boolean( 

93 (b"worktree",), b"useRelativePaths", default=False 

94 ) 

95 if use_relative: 

96 return True 

97 except KeyError: 

98 pass 

99 

100 # Preserve existing format if available 

101 if existing_path is not None: 

102 return not os.path.isabs(os.fsdecode(existing_path)) 

103 

104 return False 

105 

106 

107def _compute_gitdir_path( 

108 repo: Repo, 

109 gitdir_file: str, 

110 worktree_control_dir: str, 

111 use_relative: bool, 

112) -> str: 

113 """Compute the gitdir path and enable extension if needed. 

114 

115 Args: 

116 repo: The repository 

117 gitdir_file: Absolute path to the .git file 

118 worktree_control_dir: Absolute path to the worktree control directory 

119 use_relative: Whether to use relative paths 

120 

121 Returns: 

122 The path to write (relative or absolute) 

123 """ 

124 if use_relative: 

125 from .repo import _enable_relative_worktrees_extension 

126 

127 _enable_relative_worktrees_extension(repo) 

128 return os.path.relpath(gitdir_file, worktree_control_dir) 

129 else: 

130 return gitdir_file 

131 

132 

133class WorkTreeInfo: 

134 """Information about a single worktree. 

135 

136 Attributes: 

137 path: Path to the worktree 

138 head: Current HEAD commit SHA 

139 branch: Current branch (if not detached) 

140 bare: Whether this is a bare repository 

141 detached: Whether HEAD is detached 

142 locked: Whether the worktree is locked 

143 prunable: Whether the worktree can be pruned 

144 lock_reason: Reason for locking (if locked) 

145 """ 

146 

147 def __init__( 

148 self, 

149 path: str, 

150 head: bytes | None = None, 

151 branch: Ref | None = None, 

152 bare: bool = False, 

153 detached: bool = False, 

154 locked: bool = False, 

155 prunable: bool = False, 

156 lock_reason: str | None = None, 

157 ): 

158 """Initialize WorkTreeInfo. 

159 

160 Args: 

161 path: Path to the worktree 

162 head: Current HEAD commit SHA 

163 branch: Current branch (if not detached) 

164 bare: Whether this is a bare repository 

165 detached: Whether HEAD is detached 

166 locked: Whether the worktree is locked 

167 prunable: Whether the worktree can be pruned 

168 lock_reason: Reason for locking (if locked) 

169 """ 

170 self.path = path 

171 self.head = head 

172 self.branch = branch 

173 self.bare = bare 

174 self.detached = detached 

175 self.locked = locked 

176 self.prunable = prunable 

177 self.lock_reason = lock_reason 

178 

179 def __repr__(self) -> str: 

180 """Return string representation of WorkTreeInfo.""" 

181 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})" 

182 

183 def __eq__(self, other: object) -> bool: 

184 """Check equality with another WorkTreeInfo.""" 

185 if not isinstance(other, WorkTreeInfo): 

186 return NotImplemented 

187 return ( 

188 self.path == other.path 

189 and self.head == other.head 

190 and self.branch == other.branch 

191 and self.bare == other.bare 

192 and self.detached == other.detached 

193 and self.locked == other.locked 

194 and self.prunable == other.prunable 

195 and self.lock_reason == other.lock_reason 

196 ) 

197 

198 def open(self) -> WorkTree: 

199 """Open this worktree as a WorkTree. 

200 

201 Returns: 

202 WorkTree object for this worktree 

203 

204 Raises: 

205 NotGitRepository: If the worktree path is invalid 

206 """ 

207 from .repo import Repo 

208 

209 repo = Repo(self.path) 

210 return WorkTree(repo, self.path) 

211 

212 

213class WorkTreeContainer: 

214 """Container for managing multiple working trees. 

215 

216 This class manages worktrees for a repository, similar to how 

217 RefsContainer manages references. 

218 """ 

219 

220 def __init__(self, repo: Repo) -> None: 

221 """Initialize a WorkTreeContainer for the given repository. 

222 

223 Args: 

224 repo: The repository this container belongs to 

225 """ 

226 self._repo = repo 

227 

228 def list(self) -> list[WorkTreeInfo]: 

229 """List all worktrees for this repository. 

230 

231 Returns: 

232 A list of WorkTreeInfo objects 

233 """ 

234 return list_worktrees(self._repo) 

235 

236 def add( 

237 self, 

238 path: str | bytes | os.PathLike[str], 

239 branch: str | bytes | None = None, 

240 commit: ObjectID | None = None, 

241 force: bool = False, 

242 detach: bool = False, 

243 exist_ok: bool = False, 

244 relative_paths: bool | None = None, 

245 ) -> Repo: 

246 """Add a new worktree. 

247 

248 Args: 

249 path: Path where the new worktree should be created 

250 branch: Branch to checkout in the new worktree 

251 commit: Specific commit to checkout (results in detached HEAD) 

252 force: Force creation even if branch is already checked out elsewhere 

253 detach: Detach HEAD in the new worktree 

254 exist_ok: If True, do not raise an error if the directory already exists 

255 relative_paths: If True, use relative paths for gitdir references. 

256 If None, check worktree.useRelativePaths config (defaults to False) 

257 

258 Returns: 

259 The newly created worktree repository 

260 """ 

261 return add_worktree( 

262 self._repo, 

263 path, 

264 branch=branch, 

265 commit=commit, 

266 force=force, 

267 detach=detach, 

268 exist_ok=exist_ok, 

269 relative_paths=relative_paths, 

270 ) 

271 

272 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None: 

273 """Remove a worktree. 

274 

275 Args: 

276 path: Path to the worktree to remove 

277 force: Force removal even if there are local changes 

278 """ 

279 remove_worktree(self._repo, path, force=force) 

280 

281 def prune( 

282 self, expire: int | None = None, dry_run: bool = False 

283 ) -> builtins.list[str]: 

284 """Prune worktree administrative files for missing worktrees. 

285 

286 Args: 

287 expire: Only prune worktrees older than this many seconds 

288 dry_run: Don't actually remove anything, just report what would be removed 

289 

290 Returns: 

291 List of pruned worktree identifiers 

292 """ 

293 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run) 

294 

295 def move( 

296 self, 

297 old_path: str | bytes | os.PathLike[str], 

298 new_path: str | bytes | os.PathLike[str], 

299 relative_paths: bool | None = None, 

300 ) -> None: 

301 """Move a worktree to a new location. 

302 

303 Args: 

304 old_path: Current path of the worktree 

305 new_path: New path for the worktree 

306 relative_paths: If True, use relative paths for gitdir references. 

307 If None, check worktree.useRelativePaths config or preserve existing format 

308 """ 

309 move_worktree(self._repo, old_path, new_path, relative_paths=relative_paths) 

310 

311 def lock( 

312 self, path: str | bytes | os.PathLike[str], reason: str | None = None 

313 ) -> None: 

314 """Lock a worktree to prevent it from being pruned. 

315 

316 Args: 

317 path: Path to the worktree to lock 

318 reason: Optional reason for locking 

319 """ 

320 lock_worktree(self._repo, path, reason=reason) 

321 

322 def unlock(self, path: str | bytes | os.PathLike[str]) -> None: 

323 """Unlock a worktree. 

324 

325 Args: 

326 path: Path to the worktree to unlock 

327 """ 

328 unlock_worktree(self._repo, path) 

329 

330 def repair( 

331 self, 

332 paths: Sequence[str | bytes | os.PathLike[str]] | None = None, 

333 relative_paths: bool | None = None, 

334 ) -> builtins.list[str]: 

335 """Repair worktree administrative files. 

336 

337 Args: 

338 paths: Optional list of worktree paths to repair. If None, repairs 

339 connections from the main repository to all linked worktrees. 

340 relative_paths: If True, use relative paths for gitdir references. 

341 If None, check worktree.useRelativePaths config or preserve existing format 

342 

343 Returns: 

344 List of repaired worktree paths 

345 """ 

346 return repair_worktree(self._repo, paths=paths, relative_paths=relative_paths) 

347 

348 def __iter__(self) -> Iterator[WorkTreeInfo]: 

349 """Iterate over all worktrees.""" 

350 yield from self.list() 

351 

352 

353class WorkTree: 

354 """Working tree operations for a Git repository. 

355 

356 This class provides methods for working with the working tree, 

357 such as staging files, committing changes, and resetting the index. 

358 """ 

359 

360 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

361 """Initialize a WorkTree for the given repository. 

362 

363 Args: 

364 repo: The repository this working tree belongs to 

365 path: Path to the working tree directory 

366 """ 

367 self._repo = repo 

368 raw_path = os.fspath(path) 

369 if isinstance(raw_path, bytes): 

370 self.path: str = os.fsdecode(raw_path) 

371 else: 

372 self.path = raw_path 

373 self.path = os.path.abspath(self.path) 

374 

375 def stage( 

376 self, 

377 fs_paths: str 

378 | bytes 

379 | os.PathLike[str] 

380 | Iterable[str | bytes | os.PathLike[str]], 

381 config: Config | None = None, 

382 ) -> None: 

383 """Stage a set of paths. 

384 

385 Args: 

386 fs_paths: List of paths, relative to the repository path 

387 config: Repository configuration. If None, falls back to 

388 ``self._repo.get_config_stack()``. 

389 """ 

390 if config is None: 

391 config = self._repo.get_config_stack() 

392 root_path_bytes = os.fsencode(self.path) 

393 

394 if isinstance(fs_paths, str | bytes | os.PathLike): 

395 fs_paths = [fs_paths] 

396 fs_paths = list(fs_paths) 

397 

398 from .index import ( 

399 _fs_to_tree_path, 

400 blob_from_path_and_stat, 

401 index_entry_from_directory, 

402 index_entry_from_stat, 

403 ) 

404 

405 index = self._repo.open_index(config=config) 

406 blob_normalizer = self._repo.get_blob_normalizer(config=config) 

407 for fs_path in fs_paths: 

408 if not isinstance(fs_path, bytes): 

409 fs_path = os.fsencode(fs_path) 

410 if os.path.isabs(fs_path): 

411 raise ValueError( 

412 f"path {fs_path!r} should be relative to " 

413 "repository root, not absolute" 

414 ) 

415 tree_path = _fs_to_tree_path(fs_path) 

416 full_path = os.path.join(root_path_bytes, fs_path) 

417 try: 

418 st = os.lstat(full_path) 

419 except (FileNotFoundError, NotADirectoryError): 

420 # File no longer exists 

421 try: 

422 del index[tree_path] 

423 except KeyError: 

424 pass # already removed 

425 else: 

426 if stat.S_ISDIR(st.st_mode): 

427 entry = index_entry_from_directory(st, full_path) 

428 if entry: 

429 index[tree_path] = entry 

430 else: 

431 try: 

432 del index[tree_path] 

433 except KeyError: 

434 pass 

435 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

436 try: 

437 del index[tree_path] 

438 except KeyError: 

439 pass 

440 else: 

441 blob = blob_from_path_and_stat(full_path, st) 

442 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

443 self._repo.object_store.add_object(blob) 

444 index[tree_path] = index_entry_from_stat(st, blob.id) 

445 index.write() 

446 

447 def unstage( 

448 self, 

449 fs_paths: Sequence[str], 

450 config: Config | None = None, 

451 ) -> None: 

452 """Unstage specific file in the index. 

453 

454 Args: 

455 fs_paths: a list of files to unstage, 

456 relative to the repository path. 

457 config: Repository configuration. If None, falls back to 

458 ``self._repo.get_config_stack()``. 

459 """ 

460 if config is None: 

461 config = self._repo.get_config_stack() 

462 from .index import IndexEntry, _fs_to_tree_path 

463 

464 index = self._repo.open_index(config=config) 

465 try: 

466 commit = self._repo[Ref(b"HEAD")] 

467 except KeyError: 

468 # no head mean no commit in the repo 

469 for fs_path in fs_paths: 

470 tree_path = _fs_to_tree_path(fs_path) 

471 del index[tree_path] 

472 index.write() 

473 return 

474 else: 

475 assert isinstance(commit, Commit), "HEAD must be a commit" 

476 tree_id = commit.tree 

477 

478 for fs_path in fs_paths: 

479 tree_path = _fs_to_tree_path(fs_path) 

480 try: 

481 tree = self._repo.object_store[tree_id] 

482 assert isinstance(tree, Tree) 

483 tree_entry = tree.lookup_path( 

484 self._repo.object_store.__getitem__, tree_path 

485 ) 

486 except KeyError: 

487 # if tree_entry didn't exist, this file was being added, so 

488 # remove index entry 

489 try: 

490 del index[tree_path] 

491 continue 

492 except KeyError as exc: 

493 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

494 

495 st = None 

496 try: 

497 st = os.lstat(os.path.join(self.path, fs_path)) 

498 except FileNotFoundError: 

499 pass 

500 

501 blob_obj = self._repo[tree_entry[1]] 

502 assert isinstance(blob_obj, Blob) 

503 blob_size = len(blob_obj.data) 

504 

505 index_entry = IndexEntry( 

506 ctime=(commit.commit_time, 0), 

507 mtime=(commit.commit_time, 0), 

508 dev=st.st_dev if st else 0, 

509 ino=st.st_ino if st else 0, 

510 mode=tree_entry[0], 

511 uid=st.st_uid if st else 0, 

512 gid=st.st_gid if st else 0, 

513 size=blob_size, 

514 sha=tree_entry[1], 

515 flags=0, 

516 extended_flags=0, 

517 ) 

518 

519 index[tree_path] = index_entry 

520 index.write() 

521 

522 def commit( 

523 self, 

524 message: str | bytes | Callable[[Any, Commit], bytes] | None = None, 

525 committer: bytes | None = None, 

526 author: bytes | None = None, 

527 commit_timestamp: float | None = None, 

528 commit_timezone: int | None = None, 

529 author_timestamp: float | None = None, 

530 author_timezone: int | None = None, 

531 tree: ObjectID | None = None, 

532 encoding: bytes | None = None, 

533 ref: Ref | None = Ref(b"HEAD"), 

534 merge_heads: Sequence[ObjectID] | None = None, 

535 no_verify: bool = False, 

536 sign: bool | None = None, 

537 signoff: bool | None = None, 

538 config: Config | None = None, 

539 ) -> ObjectID: 

540 """Create a new commit. 

541 

542 If not specified, committer and author default to 

543 get_user_identity(..., 'COMMITTER') 

544 and get_user_identity(..., 'AUTHOR') respectively. 

545 

546 Args: 

547 message: Commit message (bytes or callable that takes (repo, commit) 

548 and returns bytes) 

549 committer: Committer fullname 

550 author: Author fullname 

551 commit_timestamp: Commit timestamp (defaults to now) 

552 commit_timezone: Commit timestamp timezone (defaults to GMT) 

553 author_timestamp: Author timestamp (defaults to commit 

554 timestamp) 

555 author_timezone: Author timestamp timezone 

556 (defaults to commit timestamp timezone) 

557 tree: SHA1 of the tree root to use (if not specified the 

558 current index will be committed). 

559 encoding: Encoding 

560 ref: Optional ref to commit to (defaults to current branch). 

561 If None, creates a dangling commit without updating any ref. 

562 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

563 no_verify: Skip pre-commit and commit-msg hooks 

564 sign: GPG Sign the commit (bool, defaults to False, 

565 pass True to use default GPG key, 

566 pass a str containing Key ID to use a specific GPG key) 

567 signoff: Add Signed-off-by line (DCO) to commit message. 

568 If None, uses format.signoff config. 

569 config: Configuration to consult for committer/author identity and 

570 other commit-time settings. If None, falls back to 

571 ``self._repo.get_config_stack()``. 

572 

573 Returns: 

574 New commit SHA1 

575 """ 

576 try: 

577 if not no_verify: 

578 self._repo.hooks["pre-commit"].execute() 

579 except HookError as exc: 

580 raise CommitError(exc) from exc 

581 except KeyError: # no hook defined, silent fallthrough 

582 pass 

583 

584 if config is None: 

585 config = self._repo.get_config_stack() 

586 

587 c = Commit() 

588 if tree is None: 

589 index = self._repo.open_index(config=config) 

590 c.tree = index.commit(self._repo.object_store) 

591 else: 

592 if len(tree) != 40: 

593 raise ValueError("tree must be a 40-byte hex sha string") 

594 c.tree = tree 

595 

596 if merge_heads is None: 

597 merge_heads = self._repo._read_heads("MERGE_HEAD") 

598 if committer is None: 

599 committer = get_user_identity(config, kind="COMMITTER") 

600 check_user_identity(committer) 

601 c.committer = committer 

602 if commit_timestamp is None: 

603 # FIXME: Support GIT_COMMITTER_DATE environment variable 

604 commit_timestamp = time.time() 

605 c.commit_time = int(commit_timestamp) 

606 if commit_timezone is None: 

607 # FIXME: Use current user timezone rather than UTC 

608 commit_timezone = 0 

609 c.commit_timezone = commit_timezone 

610 if author is None: 

611 author = get_user_identity(config, kind="AUTHOR") 

612 c.author = author 

613 check_user_identity(author) 

614 if author_timestamp is None: 

615 # FIXME: Support GIT_AUTHOR_DATE environment variable 

616 author_timestamp = commit_timestamp 

617 c.author_time = int(author_timestamp) 

618 if author_timezone is None: 

619 author_timezone = commit_timezone 

620 c.author_timezone = author_timezone 

621 if encoding is None: 

622 try: 

623 encoding = config.get(("i18n",), "commitEncoding") 

624 except KeyError: 

625 pass # No dice 

626 if encoding is not None: 

627 c.encoding = encoding 

628 # Store original message (might be callable) 

629 original_message = message 

630 message = None # Will be set later after parents are set 

631 

632 # Check if we should sign the commit 

633 if sign is None: 

634 # Check commit.gpgSign configuration when sign is not explicitly set 

635 try: 

636 should_sign = config.get_boolean( 

637 (b"commit",), b"gpgsign", default=False 

638 ) 

639 except KeyError: 

640 should_sign = False # Default to not signing if no config 

641 else: 

642 should_sign = sign 

643 

644 # Get the signing key from config if signing is enabled 

645 keyid = None 

646 if should_sign: 

647 try: 

648 keyid_bytes = config.get((b"user",), b"signingkey") 

649 keyid = keyid_bytes.decode() if keyid_bytes else None 

650 except KeyError: 

651 keyid = None 

652 

653 if ref is None: 

654 # Create a dangling commit 

655 c.parents = list(merge_heads) 

656 else: 

657 try: 

658 old_head = self._repo.refs[ref] 

659 c.parents = [old_head, *merge_heads] 

660 except KeyError: 

661 c.parents = list(merge_heads) 

662 

663 # Handle message after parents are set 

664 if callable(original_message): 

665 message = original_message(self._repo, c) 

666 if message is None: 

667 raise ValueError("Message callback returned None") 

668 else: 

669 message = original_message 

670 

671 if message is None: 

672 # FIXME: Try to read commit message from .git/MERGE_MSG 

673 raise ValueError("No commit message specified") 

674 

675 # Handle signoff 

676 should_signoff = signoff 

677 if should_signoff is None: 

678 # Check format.signOff configuration 

679 try: 

680 should_signoff = config.get_boolean( 

681 (b"format",), b"signoff", default=False 

682 ) 

683 except KeyError: 

684 should_signoff = False 

685 

686 if should_signoff: 

687 # Add Signed-off-by trailer 

688 # Get the committer identity for the signoff 

689 signoff_identity = committer 

690 if isinstance(message, bytes): 

691 message_bytes = message 

692 else: 

693 message_bytes = message.encode("utf-8") 

694 

695 message_bytes = add_trailer_to_message( 

696 message_bytes, 

697 "Signed-off-by", 

698 signoff_identity.decode("utf-8") 

699 if isinstance(signoff_identity, bytes) 

700 else signoff_identity, 

701 separator=":", 

702 where="end", 

703 if_exists="addIfDifferentNeighbor", 

704 if_missing="add", 

705 ) 

706 message = message_bytes 

707 

708 try: 

709 if no_verify: 

710 c.message = message 

711 else: 

712 c.message = self._repo.hooks["commit-msg"].execute(message) 

713 if c.message is None: 

714 c.message = message 

715 except HookError as exc: 

716 raise CommitError(exc) from exc 

717 except KeyError: # no hook defined, message not modified 

718 c.message = message 

719 

720 if ref is None: 

721 # Create a dangling commit 

722 if should_sign: 

723 from dulwich.signature import get_signature_vendor 

724 

725 vendor = get_signature_vendor(config=config) 

726 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid) 

727 self._repo.object_store.add_object(c) 

728 else: 

729 try: 

730 old_head = self._repo.refs[ref] 

731 if should_sign: 

732 from dulwich.signature import get_signature_vendor 

733 

734 vendor = get_signature_vendor(config=config) 

735 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid) 

736 self._repo.object_store.add_object(c) 

737 message_bytes = ( 

738 message.encode() if isinstance(message, str) else message 

739 ) 

740 ok = self._repo.refs.set_if_equals( 

741 ref, 

742 old_head, 

743 c.id, 

744 message=b"commit: " + message_bytes, 

745 committer=committer, 

746 timestamp=int(commit_timestamp) 

747 if commit_timestamp is not None 

748 else None, 

749 timezone=commit_timezone, 

750 ) 

751 except KeyError: 

752 c.parents = list(merge_heads) 

753 if should_sign: 

754 from dulwich.signature import get_signature_vendor 

755 

756 vendor = get_signature_vendor(config=config) 

757 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid) 

758 self._repo.object_store.add_object(c) 

759 message_bytes = ( 

760 message.encode() if isinstance(message, str) else message 

761 ) 

762 ok = self._repo.refs.add_if_new( 

763 ref, 

764 c.id, 

765 message=b"commit: " + message_bytes, 

766 committer=committer, 

767 timestamp=int(commit_timestamp) 

768 if commit_timestamp is not None 

769 else None, 

770 timezone=commit_timezone, 

771 ) 

772 if not ok: 

773 # Fail if the atomic compare-and-swap failed, leaving the 

774 # commit and all its objects as garbage. 

775 raise CommitError(f"{ref!r} changed during commit") 

776 

777 self._repo._del_named_file("MERGE_HEAD") 

778 

779 try: 

780 self._repo.hooks["post-commit"].execute() 

781 except HookError as e: # silent failure 

782 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

783 except KeyError: # no hook defined, silent fallthrough 

784 pass 

785 

786 # Trigger auto GC if needed 

787 from .gc import maybe_auto_gc 

788 

789 maybe_auto_gc(self._repo) 

790 

791 return c.id 

792 

793 def reset_index( 

794 self, 

795 tree: ObjectID | None = None, 

796 config: Config | None = None, 

797 ) -> None: 

798 """Reset the index back to a specific tree. 

799 

800 Args: 

801 tree: Tree SHA to reset to, None for current HEAD tree. 

802 config: Stacked configuration used for filter setup. If None, 

803 falls back to ``self._repo.get_config_stack()``. 

804 """ 

805 if config is None: 

806 config = self._repo.get_config_stack() 

807 stacked_config = config 

808 from .index import ( 

809 build_index_from_tree, 

810 symlink, 

811 validate_path_element_default, 

812 validate_path_element_hfs, 

813 validate_path_element_ntfs, 

814 ) 

815 

816 if tree is None: 

817 head = self._repo[Ref(b"HEAD")] 

818 if isinstance(head, Tag): 

819 _cls, obj = head.object 

820 head = self._repo.get_object(obj) 

821 from .objects import Commit 

822 

823 assert isinstance(head, Commit) 

824 tree = head.tree 

825 config = self._repo.get_config() 

826 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

827 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

828 validate_path_element = validate_path_element_ntfs 

829 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

830 validate_path_element = validate_path_element_hfs 

831 else: 

832 validate_path_element = validate_path_element_default 

833 if config.get_boolean(b"core", b"symlinks", True): 

834 symlink_fn = symlink 

835 else: 

836 

837 def symlink_fn( # type: ignore[misc,unused-ignore] 

838 src: str | bytes, 

839 dst: str | bytes, 

840 target_is_directory: bool = False, 

841 *, 

842 dir_fd: int | None = None, 

843 ) -> None: 

844 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f: 

845 f.write(src) 

846 

847 blob_normalizer = self._repo.get_blob_normalizer(config=stacked_config) 

848 return build_index_from_tree( 

849 self.path, 

850 self._repo.index_path(), 

851 self._repo.object_store, 

852 tree, 

853 honor_filemode=honor_filemode, 

854 validate_path_element=validate_path_element, 

855 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore] 

856 blob_normalizer=blob_normalizer, 

857 ) 

858 

859 def _sparse_checkout_file_path(self) -> str: 

860 """Return the path of the sparse-checkout file in this repo's control dir.""" 

861 return os.path.join(self._repo.controldir(), "info", "sparse-checkout") 

862 

863 def configure_for_cone_mode(self) -> None: 

864 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

865 config = self._repo.get_config() 

866 config.set((b"core",), b"sparseCheckout", b"true") 

867 config.set((b"core",), b"sparseCheckoutCone", b"true") 

868 config.write_to_path() 

869 

870 def infer_cone_mode(self) -> bool: 

871 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

872 config = self._repo.get_config() 

873 try: 

874 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

875 return sc_cone == b"true" 

876 except KeyError: 

877 # If core.sparseCheckoutCone is not set, default to False 

878 return False 

879 

880 def get_sparse_checkout_patterns(self) -> list[str]: 

881 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

882 

883 Returns: 

884 A list of patterns. Returns an empty list if the file is missing. 

885 """ 

886 path = self._sparse_checkout_file_path() 

887 try: 

888 with open(path, encoding="utf-8") as f: 

889 return [line.strip() for line in f if line.strip()] 

890 except FileNotFoundError: 

891 return [] 

892 

893 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

894 """Write the given sparse-checkout patterns into info/sparse-checkout. 

895 

896 Creates the info/ directory if it does not exist. 

897 

898 Args: 

899 patterns: A list of gitignore-style patterns to store. 

900 """ 

901 info_dir = os.path.join(self._repo.controldir(), "info") 

902 os.makedirs(info_dir, exist_ok=True) 

903 

904 path = self._sparse_checkout_file_path() 

905 with open(path, "w", encoding="utf-8") as f: 

906 for pat in patterns: 

907 f.write(pat + "\n") 

908 

909 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

910 """Write the given cone-mode directory patterns into info/sparse-checkout. 

911 

912 For each directory to include, add an inclusion line that "undoes" the prior 

913 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

914 Never add the same line twice. 

915 """ 

916 patterns = ["/*", "!/*/"] 

917 if dirs: 

918 for d in dirs: 

919 d = d.strip("/") 

920 line = f"/{d}/" 

921 if d and line not in patterns: 

922 patterns.append(line) 

923 self.set_sparse_checkout_patterns(patterns) 

924 

925 

926def read_worktree_lock_reason(worktree_path: str) -> str | None: 

927 """Read the lock reason for a worktree. 

928 

929 Args: 

930 worktree_path: Path to the worktree's administrative directory 

931 

932 Returns: 

933 The lock reason if the worktree is locked, None otherwise 

934 """ 

935 locked_path = os.path.join(worktree_path, "locked") 

936 if not os.path.exists(locked_path): 

937 return None 

938 

939 try: 

940 with open(locked_path) as f: 

941 return f.read().strip() 

942 except (FileNotFoundError, PermissionError): 

943 return None 

944 

945 

946def list_worktrees(repo: Repo) -> list[WorkTreeInfo]: 

947 """List all worktrees for the given repository. 

948 

949 Args: 

950 repo: The repository to list worktrees for 

951 

952 Returns: 

953 A list of WorkTreeInfo objects 

954 """ 

955 worktrees = [] 

956 

957 # Add main worktree 

958 main_wt_info = WorkTreeInfo( 

959 path=repo.path, 

960 head=repo.head(), 

961 bare=repo.bare, 

962 detached=False, 

963 locked=False, 

964 prunable=False, 

965 ) 

966 

967 # Get branch info for main worktree 

968 try: 

969 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f: 

970 head_contents = f.read().strip() 

971 if head_contents.startswith(SYMREF): 

972 ref_name = Ref(head_contents[len(SYMREF) :].strip()) 

973 main_wt_info.branch = ref_name 

974 else: 

975 main_wt_info.detached = True 

976 main_wt_info.branch = None 

977 except (FileNotFoundError, PermissionError): 

978 main_wt_info.branch = None 

979 main_wt_info.detached = True 

980 

981 worktrees.append(main_wt_info) 

982 

983 # List additional worktrees 

984 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

985 if os.path.isdir(worktrees_dir): 

986 for entry in os.listdir(worktrees_dir): 

987 worktree_path = os.path.join(worktrees_dir, entry) 

988 if not os.path.isdir(worktree_path): 

989 continue 

990 

991 wt_info = WorkTreeInfo( 

992 path="", # Will be set below 

993 bare=False, 

994 detached=False, 

995 locked=False, 

996 prunable=False, 

997 ) 

998 

999 # Read gitdir to get actual worktree path 

1000 gitdir_path = os.path.join(worktree_path, GITDIR) 

1001 try: 

1002 with open(gitdir_path, "rb") as f: 

1003 gitdir_contents = f.read().strip() 

1004 # Convert relative path to absolute if needed 

1005 wt_path = os.fsdecode(gitdir_contents) 

1006 if not os.path.isabs(wt_path): 

1007 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1008 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix 

1009 except (FileNotFoundError, PermissionError): 

1010 # Worktree directory is missing, skip it 

1011 # TODO: Consider adding these as prunable worktrees with a placeholder path 

1012 continue 

1013 

1014 # Check if worktree path exists 

1015 if wt_info.path and not os.path.exists(wt_info.path): 

1016 wt_info.prunable = True 

1017 

1018 # Read HEAD 

1019 head_path = os.path.join(worktree_path, "HEAD") 

1020 try: 

1021 with open(head_path, "rb") as f: 

1022 head_contents = f.read().strip() 

1023 if head_contents.startswith(SYMREF): 

1024 ref_name = Ref(head_contents[len(SYMREF) :].strip()) 

1025 wt_info.branch = ref_name 

1026 # Resolve ref to get commit sha 

1027 try: 

1028 wt_info.head = repo.refs[ref_name] 

1029 except KeyError: 

1030 wt_info.head = None 

1031 else: 

1032 wt_info.detached = True 

1033 wt_info.branch = None 

1034 wt_info.head = head_contents 

1035 except (FileNotFoundError, PermissionError): 

1036 wt_info.head = None 

1037 wt_info.branch = None 

1038 

1039 # Check if locked 

1040 lock_reason = read_worktree_lock_reason(worktree_path) 

1041 if lock_reason is not None: 

1042 wt_info.locked = True 

1043 wt_info.lock_reason = lock_reason 

1044 

1045 worktrees.append(wt_info) 

1046 

1047 return worktrees 

1048 

1049 

1050def add_worktree( 

1051 repo: Repo, 

1052 path: str | bytes | os.PathLike[str], 

1053 branch: str | bytes | None = None, 

1054 commit: ObjectID | None = None, 

1055 force: bool = False, 

1056 detach: bool = False, 

1057 exist_ok: bool = False, 

1058 relative_paths: bool | None = None, 

1059) -> Repo: 

1060 """Add a new worktree to the repository. 

1061 

1062 Args: 

1063 repo: The main repository 

1064 path: Path where the new worktree should be created 

1065 branch: Branch to checkout in the new worktree (creates if doesn't exist) 

1066 commit: Specific commit to checkout (results in detached HEAD) 

1067 force: Force creation even if branch is already checked out elsewhere 

1068 detach: Detach HEAD in the new worktree 

1069 exist_ok: If True, do not raise an error if the directory already exists 

1070 relative_paths: If True, use relative paths for gitdir references. 

1071 If None, check worktree.useRelativePaths config (defaults to False) 

1072 

1073 Returns: 

1074 The newly created worktree repository 

1075 

1076 Raises: 

1077 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out 

1078 """ 

1079 from .repo import Repo as RepoClass 

1080 

1081 path = os.fspath(path) 

1082 if isinstance(path, bytes): 

1083 path = os.fsdecode(path) 

1084 

1085 # Determine whether to use relative paths 

1086 use_relative = _should_use_relative_paths(repo, relative_paths) 

1087 

1088 # Check if path already exists 

1089 if os.path.exists(path) and not exist_ok: 

1090 raise ValueError(f"Path already exists: {path}") 

1091 

1092 # Normalize branch name 

1093 if branch is not None: 

1094 if isinstance(branch, str): 

1095 branch = branch.encode() 

1096 branch = local_branch_name(branch) 

1097 

1098 # Check if branch is already checked out in another worktree 

1099 if branch and not force: 

1100 for wt in list_worktrees(repo): 

1101 if wt.branch == branch: 

1102 raise ValueError( 

1103 f"Branch {branch.decode()} is already checked out at {wt.path}" 

1104 ) 

1105 

1106 # Determine what to checkout 

1107 if commit is not None: 

1108 checkout_ref = commit 

1109 detach = True 

1110 elif branch is not None: 

1111 # Check if branch exists 

1112 try: 

1113 checkout_ref = repo.refs[branch] 

1114 except KeyError: 

1115 if commit is None: 

1116 # Create new branch from HEAD 

1117 checkout_ref = repo.head() 

1118 repo.refs[branch] = checkout_ref 

1119 else: 

1120 # Create new branch from specified commit 

1121 checkout_ref = commit 

1122 repo.refs[branch] = checkout_ref 

1123 else: 

1124 # Default to current HEAD 

1125 checkout_ref = repo.head() 

1126 detach = True 

1127 

1128 # Create the worktree directory 

1129 os.makedirs(path, exist_ok=exist_ok) 

1130 

1131 # Initialize the worktree 

1132 identifier = os.path.basename(path) 

1133 wt_repo = RepoClass._init_new_working_directory( 

1134 path, repo, identifier=identifier, relative_paths=use_relative 

1135 ) 

1136 

1137 # Set HEAD appropriately 

1138 if detach: 

1139 # Detached HEAD - write SHA directly to HEAD 

1140 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f: 

1141 f.write(checkout_ref + b"\n") 

1142 else: 

1143 # Point to branch 

1144 assert branch is not None # Should be guaranteed by logic above 

1145 from dulwich.refs import HEADREF 

1146 

1147 wt_repo.refs.set_symbolic_ref(HEADREF, branch) 

1148 

1149 # Reset index to match HEAD 

1150 wt_repo.get_worktree().reset_index(config=wt_repo.get_config_stack()) 

1151 

1152 return wt_repo 

1153 

1154 

1155def remove_worktree( 

1156 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False 

1157) -> None: 

1158 """Remove a worktree. 

1159 

1160 Args: 

1161 repo: The main repository 

1162 path: Path to the worktree to remove 

1163 force: Force removal even if there are local changes 

1164 

1165 Raises: 

1166 ValueError: If the worktree doesn't exist, has local changes, or is locked 

1167 """ 

1168 path = os.fspath(path) 

1169 if isinstance(path, bytes): 

1170 path = os.fsdecode(path) 

1171 

1172 # Don't allow removing the main worktree 

1173 if os.path.abspath(path) == os.path.abspath(repo.path): 

1174 raise ValueError("Cannot remove the main working tree") 

1175 

1176 # Find the worktree 

1177 worktree_found = False 

1178 worktree_id = None 

1179 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1180 

1181 if os.path.isdir(worktrees_dir): 

1182 for entry in os.listdir(worktrees_dir): 

1183 worktree_path = os.path.join(worktrees_dir, entry) 

1184 gitdir_path = os.path.join(worktree_path, GITDIR) 

1185 

1186 try: 

1187 with open(gitdir_path, "rb") as f: 

1188 gitdir_contents = f.read().strip() 

1189 wt_path = os.fsdecode(gitdir_contents) 

1190 if not os.path.isabs(wt_path): 

1191 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1192 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1193 

1194 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1195 worktree_found = True 

1196 worktree_id = entry 

1197 break 

1198 except (FileNotFoundError, PermissionError): 

1199 continue 

1200 

1201 if not worktree_found: 

1202 raise ValueError(f"Worktree not found: {path}") 

1203 

1204 assert worktree_id is not None # Should be set if worktree_found is True 

1205 worktree_control_dir = os.path.join(worktrees_dir, worktree_id) 

1206 

1207 # Check if locked 

1208 if os.path.exists(os.path.join(worktree_control_dir, "locked")): 

1209 if not force: 

1210 raise ValueError(f"Worktree is locked: {path}") 

1211 

1212 # Check for local changes if not forcing 

1213 if not force and os.path.exists(path): 

1214 # TODO: Check for uncommitted changes in the worktree 

1215 pass 

1216 

1217 # Remove the working directory 

1218 if os.path.exists(path): 

1219 shutil.rmtree(path) 

1220 

1221 # Remove the administrative files 

1222 shutil.rmtree(worktree_control_dir) 

1223 

1224 

1225def prune_worktrees( 

1226 repo: Repo, expire: int | None = None, dry_run: bool = False 

1227) -> list[str]: 

1228 """Prune worktree administrative files for missing worktrees. 

1229 

1230 Args: 

1231 repo: The main repository 

1232 expire: Only prune worktrees older than this many seconds 

1233 dry_run: Don't actually remove anything, just report what would be removed 

1234 

1235 Returns: 

1236 List of pruned worktree identifiers 

1237 """ 

1238 pruned: list[str] = [] 

1239 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1240 

1241 if not os.path.isdir(worktrees_dir): 

1242 return pruned 

1243 

1244 current_time = time.time() 

1245 

1246 for entry in os.listdir(worktrees_dir): 

1247 worktree_path = os.path.join(worktrees_dir, entry) 

1248 if not os.path.isdir(worktree_path): 

1249 continue 

1250 

1251 # Skip locked worktrees 

1252 if os.path.exists(os.path.join(worktree_path, "locked")): 

1253 continue 

1254 

1255 should_prune = False 

1256 

1257 # Check if gitdir exists and points to valid location 

1258 gitdir_path = os.path.join(worktree_path, GITDIR) 

1259 try: 

1260 with open(gitdir_path, "rb") as f: 

1261 gitdir_contents = f.read().strip() 

1262 wt_path = os.fsdecode(gitdir_contents) 

1263 if not os.path.isabs(wt_path): 

1264 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1265 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1266 

1267 if not os.path.exists(wt_dir): 

1268 should_prune = True 

1269 except (FileNotFoundError, PermissionError): 

1270 should_prune = True 

1271 

1272 # Check expiry time if specified 

1273 if should_prune and expire is not None: 

1274 stat_info = os.stat(worktree_path) 

1275 age = current_time - stat_info.st_mtime 

1276 if age < expire: 

1277 should_prune = False 

1278 

1279 if should_prune: 

1280 pruned.append(entry) 

1281 if not dry_run: 

1282 shutil.rmtree(worktree_path) 

1283 

1284 return pruned 

1285 

1286 

1287def lock_worktree( 

1288 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None 

1289) -> None: 

1290 """Lock a worktree to prevent it from being pruned. 

1291 

1292 Args: 

1293 repo: The main repository 

1294 path: Path to the worktree to lock 

1295 reason: Optional reason for locking 

1296 """ 

1297 worktree_id = _find_worktree_id(repo, path) 

1298 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1299 

1300 lock_path = os.path.join(worktree_control_dir, "locked") 

1301 with open(lock_path, "w") as f: 

1302 if reason: 

1303 f.write(reason) 

1304 

1305 

1306def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None: 

1307 """Unlock a worktree. 

1308 

1309 Args: 

1310 repo: The main repository 

1311 path: Path to the worktree to unlock 

1312 """ 

1313 worktree_id = _find_worktree_id(repo, path) 

1314 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1315 

1316 lock_path = os.path.join(worktree_control_dir, "locked") 

1317 if os.path.exists(lock_path): 

1318 os.remove(lock_path) 

1319 

1320 

1321def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str: 

1322 """Find the worktree identifier for the given path. 

1323 

1324 Args: 

1325 repo: The main repository 

1326 path: Path to the worktree 

1327 

1328 Returns: 

1329 The worktree identifier 

1330 

1331 Raises: 

1332 ValueError: If the worktree is not found 

1333 """ 

1334 path = os.fspath(path) 

1335 if isinstance(path, bytes): 

1336 path = os.fsdecode(path) 

1337 

1338 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1339 

1340 if os.path.isdir(worktrees_dir): 

1341 for entry in os.listdir(worktrees_dir): 

1342 worktree_path = os.path.join(worktrees_dir, entry) 

1343 gitdir_path = os.path.join(worktree_path, GITDIR) 

1344 

1345 try: 

1346 with open(gitdir_path, "rb") as f: 

1347 gitdir_contents = f.read().strip() 

1348 wt_path = os.fsdecode(gitdir_contents) 

1349 if not os.path.isabs(wt_path): 

1350 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path)) 

1351 wt_dir = os.path.dirname(wt_path) # Remove .git suffix 

1352 

1353 if os.path.abspath(wt_dir) == os.path.abspath(path): 

1354 return entry 

1355 except (FileNotFoundError, PermissionError): 

1356 continue 

1357 

1358 raise ValueError(f"Worktree not found: {path}") 

1359 

1360 

1361def move_worktree( 

1362 repo: Repo, 

1363 old_path: str | bytes | os.PathLike[str], 

1364 new_path: str | bytes | os.PathLike[str], 

1365 relative_paths: bool | None = None, 

1366) -> None: 

1367 """Move a worktree to a new location. 

1368 

1369 Args: 

1370 repo: The main repository 

1371 old_path: Current path of the worktree 

1372 new_path: New path for the worktree 

1373 relative_paths: If True, use relative paths for gitdir references. 

1374 If None, check worktree.useRelativePaths config or preserve existing format 

1375 

1376 Raises: 

1377 ValueError: If the worktree doesn't exist or new path already exists 

1378 """ 

1379 old_path = os.fspath(old_path) 

1380 new_path = os.fspath(new_path) 

1381 if isinstance(old_path, bytes): 

1382 old_path = os.fsdecode(old_path) 

1383 if isinstance(new_path, bytes): 

1384 new_path = os.fsdecode(new_path) 

1385 

1386 # Don't allow moving the main worktree 

1387 if os.path.abspath(old_path) == os.path.abspath(repo.path): 

1388 raise ValueError("Cannot move the main working tree") 

1389 

1390 # Check if new path already exists 

1391 if os.path.exists(new_path): 

1392 raise ValueError(f"Path already exists: {new_path}") 

1393 

1394 # Find the worktree 

1395 worktree_id = _find_worktree_id(repo, old_path) 

1396 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id) 

1397 

1398 # Read existing path to check format 

1399 existing_path = None 

1400 try: 

1401 with open(os.path.join(worktree_control_dir, GITDIR), "rb") as f: 

1402 existing_path = f.read().strip() 

1403 except (FileNotFoundError, PermissionError): 

1404 pass 

1405 

1406 # Determine whether to use relative paths 

1407 use_relative = _should_use_relative_paths(repo, relative_paths, existing_path) 

1408 

1409 # Move the actual worktree directory 

1410 shutil.move(old_path, new_path) 

1411 

1412 # Update the gitdir file in the worktree 

1413 gitdir_file_abs = os.path.abspath(os.path.join(new_path, ".git")) 

1414 

1415 # Compute the path to write 

1416 gitdir_path = _compute_gitdir_path( 

1417 repo, gitdir_file_abs, worktree_control_dir, use_relative 

1418 ) 

1419 

1420 # Update the gitdir pointer in the control directory 

1421 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f: 

1422 f.write(os.fsencode(gitdir_path) + b"\n") 

1423 

1424 

1425def repair_worktree( 

1426 repo: Repo, 

1427 paths: Sequence[str | bytes | os.PathLike[str]] | None = None, 

1428 relative_paths: bool | None = None, 

1429) -> list[str]: 

1430 """Repair worktree administrative files. 

1431 

1432 This repairs the connection between worktrees and the main repository 

1433 when they have been moved or become corrupted. 

1434 

1435 Args: 

1436 repo: The main repository 

1437 paths: Optional list of worktree paths to repair. If None, repairs 

1438 connections from the main repository to all linked worktrees. 

1439 relative_paths: If True, use relative paths for gitdir references. 

1440 If None, check worktree.useRelativePaths config or preserve existing format 

1441 

1442 Returns: 

1443 List of repaired worktree paths 

1444 

1445 Raises: 

1446 ValueError: If a specified path is not a valid worktree 

1447 """ 

1448 repaired: list[str] = [] 

1449 worktrees_dir = os.path.join(repo.controldir(), WORKTREES) 

1450 

1451 if paths: 

1452 # Repair specific worktrees 

1453 for path in paths: 

1454 path_str = os.fspath(path) 

1455 if isinstance(path_str, bytes): 

1456 path_str = os.fsdecode(path_str) 

1457 path_str = os.path.abspath(path_str) 

1458 

1459 # Check if this is a linked worktree 

1460 gitdir_file = os.path.join(path_str, ".git") 

1461 if not os.path.exists(gitdir_file): 

1462 raise ValueError(f"Not a valid worktree: {path_str}") 

1463 

1464 # Read the .git file to get the worktree control directory 

1465 try: 

1466 with open(gitdir_file, "rb") as f: 

1467 gitdir_content = f.read().strip() 

1468 if gitdir_content.startswith(b"gitdir: "): 

1469 worktree_control_path = gitdir_content[8:].decode() 

1470 else: 

1471 raise ValueError(f"Invalid .git file in worktree: {path_str}") 

1472 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e: 

1473 raise ValueError( 

1474 f"Cannot read .git file in worktree: {path_str}" 

1475 ) from e 

1476 

1477 # Make the path absolute if it's relative 

1478 if not os.path.isabs(worktree_control_path): 

1479 worktree_control_path = os.path.abspath( 

1480 os.path.join(path_str, worktree_control_path) 

1481 ) 

1482 

1483 # Update the gitdir file in the worktree control directory 

1484 gitdir_pointer = os.path.join(worktree_control_path, GITDIR) 

1485 if os.path.exists(gitdir_pointer): 

1486 # Read existing path to check format 

1487 existing_path = None 

1488 try: 

1489 with open(gitdir_pointer, "rb") as f: 

1490 existing_path = f.read().strip() 

1491 except (FileNotFoundError, PermissionError): 

1492 pass 

1493 

1494 # Determine which format to use for this worktree 

1495 use_relative = _should_use_relative_paths( 

1496 repo, relative_paths, existing_path 

1497 ) 

1498 

1499 # Compute the path to write 

1500 gitdir_path_to_write = _compute_gitdir_path( 

1501 repo, gitdir_file, worktree_control_path, use_relative 

1502 ) 

1503 

1504 # Update to point to the current location 

1505 with open(gitdir_pointer, "wb") as f: 

1506 f.write(os.fsencode(gitdir_path_to_write) + b"\n") 

1507 repaired.append(path_str) 

1508 else: 

1509 # Repair from main repository to all linked worktrees 

1510 if not os.path.isdir(worktrees_dir): 

1511 return repaired 

1512 

1513 for entry in os.listdir(worktrees_dir): 

1514 worktree_control_path = os.path.join(worktrees_dir, entry) 

1515 if not os.path.isdir(worktree_control_path): 

1516 continue 

1517 

1518 # Read the gitdir file to find where the worktree thinks it is 

1519 gitdir_path = os.path.join(worktree_control_path, GITDIR) 

1520 try: 

1521 with open(gitdir_path, "rb") as f: 

1522 gitdir_contents = f.read().strip() 

1523 old_gitdir_location = os.fsdecode(gitdir_contents) 

1524 except (FileNotFoundError, PermissionError): 

1525 # Can't repair if we can't read the gitdir file 

1526 continue 

1527 

1528 # Get the worktree directory (remove .git suffix) 

1529 old_worktree_path = os.path.dirname(old_gitdir_location) 

1530 

1531 # Check if the .git file exists at the old location 

1532 if os.path.exists(old_gitdir_location): 

1533 # Try to read and update the .git file to ensure it points back correctly 

1534 try: 

1535 with open(old_gitdir_location, "rb") as f: 

1536 content = f.read().strip() 

1537 if content.startswith(b"gitdir: "): 

1538 current_pointer = content[8:].decode() 

1539 if not os.path.isabs(current_pointer): 

1540 current_pointer = os.path.abspath( 

1541 os.path.join(old_worktree_path, current_pointer) 

1542 ) 

1543 

1544 # If it doesn't point to the right place, fix it 

1545 expected_pointer = worktree_control_path 

1546 if os.path.abspath(current_pointer) != os.path.abspath( 

1547 expected_pointer 

1548 ): 

1549 # Determine which format to use 

1550 use_relative = _should_use_relative_paths( 

1551 repo, relative_paths, gitdir_contents 

1552 ) 

1553 

1554 # Compute the path to write (from worktree to control dir) 

1555 pointer_to_write = _compute_gitdir_path( 

1556 repo, 

1557 worktree_control_path, 

1558 old_worktree_path, 

1559 use_relative, 

1560 ) 

1561 

1562 # Update the .git file to point to the correct location 

1563 with open(old_gitdir_location, "wb") as wf: 

1564 wf.write( 

1565 b"gitdir: " 

1566 + os.fsencode(pointer_to_write) 

1567 + b"\n" 

1568 ) 

1569 repaired.append(old_worktree_path) 

1570 except (PermissionError, UnicodeDecodeError): 

1571 continue 

1572 

1573 return repaired 

1574 

1575 

1576@contextmanager 

1577def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]: 

1578 """Create a temporary worktree that is automatically cleaned up. 

1579 

1580 Args: 

1581 repo: Dulwich repository object 

1582 prefix: Prefix for the temporary directory name 

1583 

1584 Yields: 

1585 Worktree object 

1586 """ 

1587 temp_dir = None 

1588 worktree = None 

1589 

1590 try: 

1591 # Create temporary directory 

1592 temp_dir = tempfile.mkdtemp(prefix=prefix) 

1593 

1594 # Add worktree 

1595 worktree = repo.worktrees.add(temp_dir, exist_ok=True) 

1596 

1597 yield worktree 

1598 

1599 finally: 

1600 # Clean up worktree registration 

1601 if worktree: 

1602 repo.worktrees.remove(worktree.path) 

1603 

1604 # Clean up temporary directory 

1605 if temp_dir and Path(temp_dir).exists(): 

1606 shutil.rmtree(temp_dir)