1# worktree.py -- Working tree operations for Git repositories
2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Working tree operations for Git repositories."""
23
24from __future__ import annotations
25
26import builtins
27import os
28import shutil
29import stat
30import sys
31import tempfile
32import time
33import warnings
34from collections.abc import Iterable, Iterator
35from contextlib import contextmanager
36from pathlib import Path
37from typing import Any, Callable, Union
38
39from .errors import CommitError, HookError
40from .objects import Blob, Commit, ObjectID, Tag, Tree
41from .refs import SYMREF, Ref
42from .repo import (
43 GITDIR,
44 WORKTREES,
45 Repo,
46 check_user_identity,
47 get_user_identity,
48)
49
50
51class WorkTreeInfo:
52 """Information about a single worktree.
53
54 Attributes:
55 path: Path to the worktree
56 head: Current HEAD commit SHA
57 branch: Current branch (if not detached)
58 bare: Whether this is a bare repository
59 detached: Whether HEAD is detached
60 locked: Whether the worktree is locked
61 prunable: Whether the worktree can be pruned
62 lock_reason: Reason for locking (if locked)
63 """
64
65 def __init__(
66 self,
67 path: str,
68 head: bytes | None = None,
69 branch: bytes | None = None,
70 bare: bool = False,
71 detached: bool = False,
72 locked: bool = False,
73 prunable: bool = False,
74 lock_reason: str | None = None,
75 ):
76 """Initialize WorkTreeInfo.
77
78 Args:
79 path: Path to the worktree
80 head: Current HEAD commit SHA
81 branch: Current branch (if not detached)
82 bare: Whether this is a bare repository
83 detached: Whether HEAD is detached
84 locked: Whether the worktree is locked
85 prunable: Whether the worktree can be pruned
86 lock_reason: Reason for locking (if locked)
87 """
88 self.path = path
89 self.head = head
90 self.branch = branch
91 self.bare = bare
92 self.detached = detached
93 self.locked = locked
94 self.prunable = prunable
95 self.lock_reason = lock_reason
96
97 def __repr__(self) -> str:
98 """Return string representation of WorkTreeInfo."""
99 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
100
101 def __eq__(self, other: object) -> bool:
102 """Check equality with another WorkTreeInfo."""
103 if not isinstance(other, WorkTreeInfo):
104 return NotImplemented
105 return (
106 self.path == other.path
107 and self.head == other.head
108 and self.branch == other.branch
109 and self.bare == other.bare
110 and self.detached == other.detached
111 and self.locked == other.locked
112 and self.prunable == other.prunable
113 and self.lock_reason == other.lock_reason
114 )
115
116 def open(self) -> WorkTree:
117 """Open this worktree as a WorkTree.
118
119 Returns:
120 WorkTree object for this worktree
121
122 Raises:
123 NotGitRepository: If the worktree path is invalid
124 """
125 from .repo import Repo
126
127 repo = Repo(self.path)
128 return WorkTree(repo, self.path)
129
130
131class WorkTreeContainer:
132 """Container for managing multiple working trees.
133
134 This class manages worktrees for a repository, similar to how
135 RefsContainer manages references.
136 """
137
138 def __init__(self, repo: Repo) -> None:
139 """Initialize a WorkTreeContainer for the given repository.
140
141 Args:
142 repo: The repository this container belongs to
143 """
144 self._repo = repo
145
146 def list(self) -> list[WorkTreeInfo]:
147 """List all worktrees for this repository.
148
149 Returns:
150 A list of WorkTreeInfo objects
151 """
152 return list_worktrees(self._repo)
153
154 def add(
155 self,
156 path: str | bytes | os.PathLike,
157 branch: str | bytes | None = None,
158 commit: ObjectID | None = None,
159 force: bool = False,
160 detach: bool = False,
161 exist_ok: bool = False,
162 ) -> Repo:
163 """Add a new worktree.
164
165 Args:
166 path: Path where the new worktree should be created
167 branch: Branch to checkout in the new worktree
168 commit: Specific commit to checkout (results in detached HEAD)
169 force: Force creation even if branch is already checked out elsewhere
170 detach: Detach HEAD in the new worktree
171 exist_ok: If True, do not raise an error if the directory already exists
172
173 Returns:
174 The newly created worktree repository
175 """
176 return add_worktree(
177 self._repo,
178 path,
179 branch=branch,
180 commit=commit,
181 force=force,
182 detach=detach,
183 exist_ok=exist_ok,
184 )
185
186 def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None:
187 """Remove a worktree.
188
189 Args:
190 path: Path to the worktree to remove
191 force: Force removal even if there are local changes
192 """
193 remove_worktree(self._repo, path, force=force)
194
195 def prune(
196 self, expire: int | None = None, dry_run: bool = False
197 ) -> builtins.list[str]:
198 """Prune worktree administrative files for missing worktrees.
199
200 Args:
201 expire: Only prune worktrees older than this many seconds
202 dry_run: Don't actually remove anything, just report what would be removed
203
204 Returns:
205 List of pruned worktree identifiers
206 """
207 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
208
209 def move(
210 self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike
211 ) -> None:
212 """Move a worktree to a new location.
213
214 Args:
215 old_path: Current path of the worktree
216 new_path: New path for the worktree
217 """
218 move_worktree(self._repo, old_path, new_path)
219
220 def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None:
221 """Lock a worktree to prevent it from being pruned.
222
223 Args:
224 path: Path to the worktree to lock
225 reason: Optional reason for locking
226 """
227 lock_worktree(self._repo, path, reason=reason)
228
229 def unlock(self, path: str | bytes | os.PathLike) -> None:
230 """Unlock a worktree.
231
232 Args:
233 path: Path to the worktree to unlock
234 """
235 unlock_worktree(self._repo, path)
236
237 def __iter__(self) -> Iterator[WorkTreeInfo]:
238 """Iterate over all worktrees."""
239 yield from self.list()
240
241
242class WorkTree:
243 """Working tree operations for a Git repository.
244
245 This class provides methods for working with the working tree,
246 such as staging files, committing changes, and resetting the index.
247 """
248
249 def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None:
250 """Initialize a WorkTree for the given repository.
251
252 Args:
253 repo: The repository this working tree belongs to
254 path: Path to the working tree directory
255 """
256 self._repo = repo
257 raw_path = os.fspath(path)
258 if isinstance(raw_path, bytes):
259 self.path: str = os.fsdecode(raw_path)
260 else:
261 self.path = raw_path
262 self.path = os.path.abspath(self.path)
263
264 def stage(
265 self,
266 fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
267 ) -> None:
268 """Stage a set of paths.
269
270 Args:
271 fs_paths: List of paths, relative to the repository path
272 """
273 root_path_bytes = os.fsencode(self.path)
274
275 if isinstance(fs_paths, (str, bytes, os.PathLike)):
276 fs_paths = [fs_paths]
277 fs_paths = list(fs_paths)
278
279 from .index import (
280 _fs_to_tree_path,
281 blob_from_path_and_stat,
282 index_entry_from_directory,
283 index_entry_from_stat,
284 )
285
286 index = self._repo.open_index()
287 blob_normalizer = self._repo.get_blob_normalizer()
288 for fs_path in fs_paths:
289 if not isinstance(fs_path, bytes):
290 fs_path = os.fsencode(fs_path)
291 if os.path.isabs(fs_path):
292 raise ValueError(
293 f"path {fs_path!r} should be relative to "
294 "repository root, not absolute"
295 )
296 tree_path = _fs_to_tree_path(fs_path)
297 full_path = os.path.join(root_path_bytes, fs_path)
298 try:
299 st = os.lstat(full_path)
300 except (FileNotFoundError, NotADirectoryError):
301 # File no longer exists
302 try:
303 del index[tree_path]
304 except KeyError:
305 pass # already removed
306 else:
307 if stat.S_ISDIR(st.st_mode):
308 entry = index_entry_from_directory(st, full_path)
309 if entry:
310 index[tree_path] = entry
311 else:
312 try:
313 del index[tree_path]
314 except KeyError:
315 pass
316 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
317 try:
318 del index[tree_path]
319 except KeyError:
320 pass
321 else:
322 blob = blob_from_path_and_stat(full_path, st)
323 blob = blob_normalizer.checkin_normalize(blob, fs_path)
324 self._repo.object_store.add_object(blob)
325 index[tree_path] = index_entry_from_stat(st, blob.id)
326 index.write()
327
328 def unstage(self, fs_paths: list[str]) -> None:
329 """Unstage specific file in the index.
330
331 Args:
332 fs_paths: a list of files to unstage,
333 relative to the repository path.
334 """
335 from .index import IndexEntry, _fs_to_tree_path
336
337 index = self._repo.open_index()
338 try:
339 commit = self._repo[b"HEAD"]
340 except KeyError:
341 # no head mean no commit in the repo
342 for fs_path in fs_paths:
343 tree_path = _fs_to_tree_path(fs_path)
344 del index[tree_path]
345 index.write()
346 return
347 else:
348 assert isinstance(commit, Commit), "HEAD must be a commit"
349 tree_id = commit.tree
350
351 for fs_path in fs_paths:
352 tree_path = _fs_to_tree_path(fs_path)
353 try:
354 tree = self._repo.object_store[tree_id]
355 assert isinstance(tree, Tree)
356 tree_entry = tree.lookup_path(
357 self._repo.object_store.__getitem__, tree_path
358 )
359 except KeyError:
360 # if tree_entry didn't exist, this file was being added, so
361 # remove index entry
362 try:
363 del index[tree_path]
364 continue
365 except KeyError as exc:
366 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
367
368 st = None
369 try:
370 st = os.lstat(os.path.join(self.path, fs_path))
371 except FileNotFoundError:
372 pass
373
374 blob_obj = self._repo[tree_entry[1]]
375 assert isinstance(blob_obj, Blob)
376 blob_size = len(blob_obj.data)
377
378 index_entry = IndexEntry(
379 ctime=(commit.commit_time, 0),
380 mtime=(commit.commit_time, 0),
381 dev=st.st_dev if st else 0,
382 ino=st.st_ino if st else 0,
383 mode=tree_entry[0],
384 uid=st.st_uid if st else 0,
385 gid=st.st_gid if st else 0,
386 size=blob_size,
387 sha=tree_entry[1],
388 flags=0,
389 extended_flags=0,
390 )
391
392 index[tree_path] = index_entry
393 index.write()
394
395 def commit(
396 self,
397 message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None,
398 committer: bytes | None = None,
399 author: bytes | None = None,
400 commit_timestamp: float | None = None,
401 commit_timezone: int | None = None,
402 author_timestamp: float | None = None,
403 author_timezone: int | None = None,
404 tree: ObjectID | None = None,
405 encoding: bytes | None = None,
406 ref: Ref | None = b"HEAD",
407 merge_heads: list[ObjectID] | None = None,
408 no_verify: bool = False,
409 sign: bool = False,
410 ) -> ObjectID:
411 """Create a new commit.
412
413 If not specified, committer and author default to
414 get_user_identity(..., 'COMMITTER')
415 and get_user_identity(..., 'AUTHOR') respectively.
416
417 Args:
418 message: Commit message (bytes or callable that takes (repo, commit)
419 and returns bytes)
420 committer: Committer fullname
421 author: Author fullname
422 commit_timestamp: Commit timestamp (defaults to now)
423 commit_timezone: Commit timestamp timezone (defaults to GMT)
424 author_timestamp: Author timestamp (defaults to commit
425 timestamp)
426 author_timezone: Author timestamp timezone
427 (defaults to commit timestamp timezone)
428 tree: SHA1 of the tree root to use (if not specified the
429 current index will be committed).
430 encoding: Encoding
431 ref: Optional ref to commit to (defaults to current branch).
432 If None, creates a dangling commit without updating any ref.
433 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
434 no_verify: Skip pre-commit and commit-msg hooks
435 sign: GPG Sign the commit (bool, defaults to False,
436 pass True to use default GPG key,
437 pass a str containing Key ID to use a specific GPG key)
438
439 Returns:
440 New commit SHA1
441 """
442 try:
443 if not no_verify:
444 self._repo.hooks["pre-commit"].execute()
445 except HookError as exc:
446 raise CommitError(exc) from exc
447 except KeyError: # no hook defined, silent fallthrough
448 pass
449
450 c = Commit()
451 if tree is None:
452 index = self._repo.open_index()
453 c.tree = index.commit(self._repo.object_store)
454 else:
455 if len(tree) != 40:
456 raise ValueError("tree must be a 40-byte hex sha string")
457 c.tree = tree
458
459 config = self._repo.get_config_stack()
460 if merge_heads is None:
461 merge_heads = self._repo._read_heads("MERGE_HEAD")
462 if committer is None:
463 committer = get_user_identity(config, kind="COMMITTER")
464 check_user_identity(committer)
465 c.committer = committer
466 if commit_timestamp is None:
467 # FIXME: Support GIT_COMMITTER_DATE environment variable
468 commit_timestamp = time.time()
469 c.commit_time = int(commit_timestamp)
470 if commit_timezone is None:
471 # FIXME: Use current user timezone rather than UTC
472 commit_timezone = 0
473 c.commit_timezone = commit_timezone
474 if author is None:
475 author = get_user_identity(config, kind="AUTHOR")
476 c.author = author
477 check_user_identity(author)
478 if author_timestamp is None:
479 # FIXME: Support GIT_AUTHOR_DATE environment variable
480 author_timestamp = commit_timestamp
481 c.author_time = int(author_timestamp)
482 if author_timezone is None:
483 author_timezone = commit_timezone
484 c.author_timezone = author_timezone
485 if encoding is None:
486 try:
487 encoding = config.get(("i18n",), "commitEncoding")
488 except KeyError:
489 pass # No dice
490 if encoding is not None:
491 c.encoding = encoding
492 # Store original message (might be callable)
493 original_message = message
494 message = None # Will be set later after parents are set
495
496 # Check if we should sign the commit
497 should_sign = sign
498 if sign is None:
499 # Check commit.gpgSign configuration when sign is not explicitly set
500 config = self._repo.get_config_stack()
501 try:
502 should_sign = config.get_boolean((b"commit",), b"gpgSign")
503 except KeyError:
504 should_sign = False # Default to not signing if no config
505 keyid = sign if isinstance(sign, str) else None
506
507 if ref is None:
508 # Create a dangling commit
509 c.parents = merge_heads
510 else:
511 try:
512 old_head = self._repo.refs[ref]
513 c.parents = [old_head, *merge_heads]
514 except KeyError:
515 c.parents = merge_heads
516
517 # Handle message after parents are set
518 if callable(original_message):
519 message = original_message(self._repo, c)
520 if message is None:
521 raise ValueError("Message callback returned None")
522 else:
523 message = original_message
524
525 if message is None:
526 # FIXME: Try to read commit message from .git/MERGE_MSG
527 raise ValueError("No commit message specified")
528
529 try:
530 if no_verify:
531 c.message = message
532 else:
533 c.message = self._repo.hooks["commit-msg"].execute(message)
534 if c.message is None:
535 c.message = message
536 except HookError as exc:
537 raise CommitError(exc) from exc
538 except KeyError: # no hook defined, message not modified
539 c.message = message
540
541 if ref is None:
542 # Create a dangling commit
543 if should_sign:
544 c.sign(keyid)
545 self._repo.object_store.add_object(c)
546 else:
547 try:
548 old_head = self._repo.refs[ref]
549 if should_sign:
550 c.sign(keyid)
551 self._repo.object_store.add_object(c)
552 message_bytes = (
553 message.encode() if isinstance(message, str) else message
554 )
555 ok = self._repo.refs.set_if_equals(
556 ref,
557 old_head,
558 c.id,
559 message=b"commit: " + message_bytes,
560 committer=committer,
561 timestamp=int(commit_timestamp)
562 if commit_timestamp is not None
563 else None,
564 timezone=commit_timezone,
565 )
566 except KeyError:
567 c.parents = merge_heads
568 if should_sign:
569 c.sign(keyid)
570 self._repo.object_store.add_object(c)
571 message_bytes = (
572 message.encode() if isinstance(message, str) else message
573 )
574 ok = self._repo.refs.add_if_new(
575 ref,
576 c.id,
577 message=b"commit: " + message_bytes,
578 committer=committer,
579 timestamp=int(commit_timestamp)
580 if commit_timestamp is not None
581 else None,
582 timezone=commit_timezone,
583 )
584 if not ok:
585 # Fail if the atomic compare-and-swap failed, leaving the
586 # commit and all its objects as garbage.
587 raise CommitError(f"{ref!r} changed during commit")
588
589 self._repo._del_named_file("MERGE_HEAD")
590
591 try:
592 self._repo.hooks["post-commit"].execute()
593 except HookError as e: # silent failure
594 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
595 except KeyError: # no hook defined, silent fallthrough
596 pass
597
598 # Trigger auto GC if needed
599 from .gc import maybe_auto_gc
600
601 maybe_auto_gc(self._repo)
602
603 return c.id
604
605 def reset_index(self, tree: bytes | None = None) -> None:
606 """Reset the index back to a specific tree.
607
608 Args:
609 tree: Tree SHA to reset to, None for current HEAD tree.
610 """
611 from .index import (
612 build_index_from_tree,
613 symlink,
614 validate_path_element_default,
615 validate_path_element_hfs,
616 validate_path_element_ntfs,
617 )
618
619 if tree is None:
620 head = self._repo[b"HEAD"]
621 if isinstance(head, Tag):
622 _cls, obj = head.object
623 head = self._repo.get_object(obj)
624 from .objects import Commit
625
626 assert isinstance(head, Commit)
627 tree = head.tree
628 config = self._repo.get_config()
629 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
630 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
631 validate_path_element = validate_path_element_ntfs
632 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
633 validate_path_element = validate_path_element_hfs
634 else:
635 validate_path_element = validate_path_element_default
636 if config.get_boolean(b"core", b"symlinks", True):
637 symlink_fn = symlink
638 else:
639
640 def symlink_fn(
641 src: Union[str, bytes, os.PathLike],
642 dst: Union[str, bytes, os.PathLike],
643 target_is_directory: bool = False,
644 *,
645 dir_fd: int | None = None,
646 ) -> None:
647 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
648 f.write(src)
649
650 blob_normalizer = self._repo.get_blob_normalizer()
651 return build_index_from_tree(
652 self.path,
653 self._repo.index_path(),
654 self._repo.object_store,
655 tree,
656 honor_filemode=honor_filemode,
657 validate_path_element=validate_path_element,
658 symlink_fn=symlink_fn, # type: ignore[arg-type]
659 blob_normalizer=blob_normalizer,
660 )
661
662 def _sparse_checkout_file_path(self) -> str:
663 """Return the path of the sparse-checkout file in this repo's control dir."""
664 return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
665
666 def configure_for_cone_mode(self) -> None:
667 """Ensure the repository is configured for cone-mode sparse-checkout."""
668 config = self._repo.get_config()
669 config.set((b"core",), b"sparseCheckout", b"true")
670 config.set((b"core",), b"sparseCheckoutCone", b"true")
671 config.write_to_path()
672
673 def infer_cone_mode(self) -> bool:
674 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
675 config = self._repo.get_config()
676 try:
677 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
678 return sc_cone == b"true"
679 except KeyError:
680 # If core.sparseCheckoutCone is not set, default to False
681 return False
682
683 def get_sparse_checkout_patterns(self) -> list[str]:
684 """Return a list of sparse-checkout patterns from info/sparse-checkout.
685
686 Returns:
687 A list of patterns. Returns an empty list if the file is missing.
688 """
689 path = self._sparse_checkout_file_path()
690 try:
691 with open(path, encoding="utf-8") as f:
692 return [line.strip() for line in f if line.strip()]
693 except FileNotFoundError:
694 return []
695
696 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
697 """Write the given sparse-checkout patterns into info/sparse-checkout.
698
699 Creates the info/ directory if it does not exist.
700
701 Args:
702 patterns: A list of gitignore-style patterns to store.
703 """
704 info_dir = os.path.join(self._repo.controldir(), "info")
705 os.makedirs(info_dir, exist_ok=True)
706
707 path = self._sparse_checkout_file_path()
708 with open(path, "w", encoding="utf-8") as f:
709 for pat in patterns:
710 f.write(pat + "\n")
711
712 def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None:
713 """Write the given cone-mode directory patterns into info/sparse-checkout.
714
715 For each directory to include, add an inclusion line that "undoes" the prior
716 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
717 Never add the same line twice.
718 """
719 patterns = ["/*", "!/*/"]
720 if dirs:
721 for d in dirs:
722 d = d.strip("/")
723 line = f"/{d}/"
724 if d and line not in patterns:
725 patterns.append(line)
726 self.set_sparse_checkout_patterns(patterns)
727
728
729def read_worktree_lock_reason(worktree_path: str) -> str | None:
730 """Read the lock reason for a worktree.
731
732 Args:
733 worktree_path: Path to the worktree's administrative directory
734
735 Returns:
736 The lock reason if the worktree is locked, None otherwise
737 """
738 locked_path = os.path.join(worktree_path, "locked")
739 if not os.path.exists(locked_path):
740 return None
741
742 try:
743 with open(locked_path) as f:
744 return f.read().strip()
745 except (FileNotFoundError, PermissionError):
746 return None
747
748
749def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
750 """List all worktrees for the given repository.
751
752 Args:
753 repo: The repository to list worktrees for
754
755 Returns:
756 A list of WorkTreeInfo objects
757 """
758 worktrees = []
759
760 # Add main worktree
761 main_wt_info = WorkTreeInfo(
762 path=repo.path,
763 head=repo.head(),
764 bare=repo.bare,
765 detached=False,
766 locked=False,
767 prunable=False,
768 )
769
770 # Get branch info for main worktree
771 try:
772 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
773 head_contents = f.read().strip()
774 if head_contents.startswith(SYMREF):
775 ref_name = head_contents[len(SYMREF) :].strip()
776 main_wt_info.branch = ref_name
777 else:
778 main_wt_info.detached = True
779 main_wt_info.branch = None
780 except (FileNotFoundError, PermissionError):
781 main_wt_info.branch = None
782 main_wt_info.detached = True
783
784 worktrees.append(main_wt_info)
785
786 # List additional worktrees
787 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
788 if os.path.isdir(worktrees_dir):
789 for entry in os.listdir(worktrees_dir):
790 worktree_path = os.path.join(worktrees_dir, entry)
791 if not os.path.isdir(worktree_path):
792 continue
793
794 wt_info = WorkTreeInfo(
795 path="", # Will be set below
796 bare=False,
797 detached=False,
798 locked=False,
799 prunable=False,
800 )
801
802 # Read gitdir to get actual worktree path
803 gitdir_path = os.path.join(worktree_path, GITDIR)
804 try:
805 with open(gitdir_path, "rb") as f:
806 gitdir_contents = f.read().strip()
807 # Convert relative path to absolute if needed
808 wt_path = os.fsdecode(gitdir_contents)
809 if not os.path.isabs(wt_path):
810 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
811 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
812 except (FileNotFoundError, PermissionError):
813 # Worktree directory is missing, skip it
814 # TODO: Consider adding these as prunable worktrees with a placeholder path
815 continue
816
817 # Check if worktree path exists
818 if wt_info.path and not os.path.exists(wt_info.path):
819 wt_info.prunable = True
820
821 # Read HEAD
822 head_path = os.path.join(worktree_path, "HEAD")
823 try:
824 with open(head_path, "rb") as f:
825 head_contents = f.read().strip()
826 if head_contents.startswith(SYMREF):
827 ref_name = head_contents[len(SYMREF) :].strip()
828 wt_info.branch = ref_name
829 # Resolve ref to get commit sha
830 try:
831 wt_info.head = repo.refs[ref_name]
832 except KeyError:
833 wt_info.head = None
834 else:
835 wt_info.detached = True
836 wt_info.branch = None
837 wt_info.head = head_contents
838 except (FileNotFoundError, PermissionError):
839 wt_info.head = None
840 wt_info.branch = None
841
842 # Check if locked
843 lock_reason = read_worktree_lock_reason(worktree_path)
844 if lock_reason is not None:
845 wt_info.locked = True
846 wt_info.lock_reason = lock_reason
847
848 worktrees.append(wt_info)
849
850 return worktrees
851
852
853def add_worktree(
854 repo: Repo,
855 path: str | bytes | os.PathLike,
856 branch: str | bytes | None = None,
857 commit: ObjectID | None = None,
858 force: bool = False,
859 detach: bool = False,
860 exist_ok: bool = False,
861) -> Repo:
862 """Add a new worktree to the repository.
863
864 Args:
865 repo: The main repository
866 path: Path where the new worktree should be created
867 branch: Branch to checkout in the new worktree (creates if doesn't exist)
868 commit: Specific commit to checkout (results in detached HEAD)
869 force: Force creation even if branch is already checked out elsewhere
870 detach: Detach HEAD in the new worktree
871 exist_ok: If True, do not raise an error if the directory already exists
872
873 Returns:
874 The newly created worktree repository
875
876 Raises:
877 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
878 """
879 from .repo import Repo as RepoClass
880
881 path = os.fspath(path)
882 if isinstance(path, bytes):
883 path = os.fsdecode(path)
884
885 # Check if path already exists
886 if os.path.exists(path) and not exist_ok:
887 raise ValueError(f"Path already exists: {path}")
888
889 # Normalize branch name
890 if branch is not None:
891 if isinstance(branch, str):
892 branch = branch.encode()
893 if not branch.startswith(b"refs/heads/"):
894 branch = b"refs/heads/" + branch
895
896 # Check if branch is already checked out in another worktree
897 if branch and not force:
898 for wt in list_worktrees(repo):
899 if wt.branch == branch:
900 raise ValueError(
901 f"Branch {branch.decode()} is already checked out at {wt.path}"
902 )
903
904 # Determine what to checkout
905 if commit is not None:
906 checkout_ref = commit
907 detach = True
908 elif branch is not None:
909 # Check if branch exists
910 try:
911 checkout_ref = repo.refs[branch]
912 except KeyError:
913 if commit is None:
914 # Create new branch from HEAD
915 checkout_ref = repo.head()
916 repo.refs[branch] = checkout_ref
917 else:
918 # Create new branch from specified commit
919 checkout_ref = commit
920 repo.refs[branch] = checkout_ref
921 else:
922 # Default to current HEAD
923 checkout_ref = repo.head()
924 detach = True
925
926 # Create the worktree directory
927 os.makedirs(path, exist_ok=exist_ok)
928
929 # Initialize the worktree
930 identifier = os.path.basename(path)
931 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
932
933 # Set HEAD appropriately
934 if detach:
935 # Detached HEAD - write SHA directly to HEAD
936 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
937 f.write(checkout_ref + b"\n")
938 else:
939 # Point to branch
940 wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
941
942 # Reset index to match HEAD
943 wt_repo.get_worktree().reset_index()
944
945 return wt_repo
946
947
948def remove_worktree(
949 repo: Repo, path: str | bytes | os.PathLike, force: bool = False
950) -> None:
951 """Remove a worktree.
952
953 Args:
954 repo: The main repository
955 path: Path to the worktree to remove
956 force: Force removal even if there are local changes
957
958 Raises:
959 ValueError: If the worktree doesn't exist, has local changes, or is locked
960 """
961 path = os.fspath(path)
962 if isinstance(path, bytes):
963 path = os.fsdecode(path)
964
965 # Don't allow removing the main worktree
966 if os.path.abspath(path) == os.path.abspath(repo.path):
967 raise ValueError("Cannot remove the main working tree")
968
969 # Find the worktree
970 worktree_found = False
971 worktree_id = None
972 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
973
974 if os.path.isdir(worktrees_dir):
975 for entry in os.listdir(worktrees_dir):
976 worktree_path = os.path.join(worktrees_dir, entry)
977 gitdir_path = os.path.join(worktree_path, GITDIR)
978
979 try:
980 with open(gitdir_path, "rb") as f:
981 gitdir_contents = f.read().strip()
982 wt_path = os.fsdecode(gitdir_contents)
983 if not os.path.isabs(wt_path):
984 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
985 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
986
987 if os.path.abspath(wt_dir) == os.path.abspath(path):
988 worktree_found = True
989 worktree_id = entry
990 break
991 except (FileNotFoundError, PermissionError):
992 continue
993
994 if not worktree_found:
995 raise ValueError(f"Worktree not found: {path}")
996
997 assert worktree_id is not None # Should be set if worktree_found is True
998 worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
999
1000 # Check if locked
1001 if os.path.exists(os.path.join(worktree_control_dir, "locked")):
1002 if not force:
1003 raise ValueError(f"Worktree is locked: {path}")
1004
1005 # Check for local changes if not forcing
1006 if not force and os.path.exists(path):
1007 # TODO: Check for uncommitted changes in the worktree
1008 pass
1009
1010 # Remove the working directory
1011 if os.path.exists(path):
1012 shutil.rmtree(path)
1013
1014 # Remove the administrative files
1015 shutil.rmtree(worktree_control_dir)
1016
1017
1018def prune_worktrees(
1019 repo: Repo, expire: int | None = None, dry_run: bool = False
1020) -> list[str]:
1021 """Prune worktree administrative files for missing worktrees.
1022
1023 Args:
1024 repo: The main repository
1025 expire: Only prune worktrees older than this many seconds
1026 dry_run: Don't actually remove anything, just report what would be removed
1027
1028 Returns:
1029 List of pruned worktree identifiers
1030 """
1031 pruned: list[str] = []
1032 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1033
1034 if not os.path.isdir(worktrees_dir):
1035 return pruned
1036
1037 current_time = time.time()
1038
1039 for entry in os.listdir(worktrees_dir):
1040 worktree_path = os.path.join(worktrees_dir, entry)
1041 if not os.path.isdir(worktree_path):
1042 continue
1043
1044 # Skip locked worktrees
1045 if os.path.exists(os.path.join(worktree_path, "locked")):
1046 continue
1047
1048 should_prune = False
1049
1050 # Check if gitdir exists and points to valid location
1051 gitdir_path = os.path.join(worktree_path, GITDIR)
1052 try:
1053 with open(gitdir_path, "rb") as f:
1054 gitdir_contents = f.read().strip()
1055 wt_path = os.fsdecode(gitdir_contents)
1056 if not os.path.isabs(wt_path):
1057 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1058 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1059
1060 if not os.path.exists(wt_dir):
1061 should_prune = True
1062 except (FileNotFoundError, PermissionError):
1063 should_prune = True
1064
1065 # Check expiry time if specified
1066 if should_prune and expire is not None:
1067 stat_info = os.stat(worktree_path)
1068 age = current_time - stat_info.st_mtime
1069 if age < expire:
1070 should_prune = False
1071
1072 if should_prune:
1073 pruned.append(entry)
1074 if not dry_run:
1075 shutil.rmtree(worktree_path)
1076
1077 return pruned
1078
1079
1080def lock_worktree(
1081 repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None
1082) -> None:
1083 """Lock a worktree to prevent it from being pruned.
1084
1085 Args:
1086 repo: The main repository
1087 path: Path to the worktree to lock
1088 reason: Optional reason for locking
1089 """
1090 worktree_id = _find_worktree_id(repo, path)
1091 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1092
1093 lock_path = os.path.join(worktree_control_dir, "locked")
1094 with open(lock_path, "w") as f:
1095 if reason:
1096 f.write(reason)
1097
1098
1099def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None:
1100 """Unlock a worktree.
1101
1102 Args:
1103 repo: The main repository
1104 path: Path to the worktree to unlock
1105 """
1106 worktree_id = _find_worktree_id(repo, path)
1107 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1108
1109 lock_path = os.path.join(worktree_control_dir, "locked")
1110 if os.path.exists(lock_path):
1111 os.remove(lock_path)
1112
1113
1114def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str:
1115 """Find the worktree identifier for the given path.
1116
1117 Args:
1118 repo: The main repository
1119 path: Path to the worktree
1120
1121 Returns:
1122 The worktree identifier
1123
1124 Raises:
1125 ValueError: If the worktree is not found
1126 """
1127 path = os.fspath(path)
1128 if isinstance(path, bytes):
1129 path = os.fsdecode(path)
1130
1131 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1132
1133 if os.path.isdir(worktrees_dir):
1134 for entry in os.listdir(worktrees_dir):
1135 worktree_path = os.path.join(worktrees_dir, entry)
1136 gitdir_path = os.path.join(worktree_path, GITDIR)
1137
1138 try:
1139 with open(gitdir_path, "rb") as f:
1140 gitdir_contents = f.read().strip()
1141 wt_path = os.fsdecode(gitdir_contents)
1142 if not os.path.isabs(wt_path):
1143 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1144 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1145
1146 if os.path.abspath(wt_dir) == os.path.abspath(path):
1147 return entry
1148 except (FileNotFoundError, PermissionError):
1149 continue
1150
1151 raise ValueError(f"Worktree not found: {path}")
1152
1153
1154def move_worktree(
1155 repo: Repo,
1156 old_path: str | bytes | os.PathLike,
1157 new_path: str | bytes | os.PathLike,
1158) -> None:
1159 """Move a worktree to a new location.
1160
1161 Args:
1162 repo: The main repository
1163 old_path: Current path of the worktree
1164 new_path: New path for the worktree
1165
1166 Raises:
1167 ValueError: If the worktree doesn't exist or new path already exists
1168 """
1169 old_path = os.fspath(old_path)
1170 new_path = os.fspath(new_path)
1171 if isinstance(old_path, bytes):
1172 old_path = os.fsdecode(old_path)
1173 if isinstance(new_path, bytes):
1174 new_path = os.fsdecode(new_path)
1175
1176 # Don't allow moving the main worktree
1177 if os.path.abspath(old_path) == os.path.abspath(repo.path):
1178 raise ValueError("Cannot move the main working tree")
1179
1180 # Check if new path already exists
1181 if os.path.exists(new_path):
1182 raise ValueError(f"Path already exists: {new_path}")
1183
1184 # Find the worktree
1185 worktree_id = _find_worktree_id(repo, old_path)
1186 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1187
1188 # Move the actual worktree directory
1189 shutil.move(old_path, new_path)
1190
1191 # Update the gitdir file in the worktree
1192 gitdir_file = os.path.join(new_path, ".git")
1193
1194 # Update the gitdir pointer in the control directory
1195 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
1196 f.write(os.fsencode(gitdir_file) + b"\n")
1197
1198
1199@contextmanager
1200def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
1201 """Create a temporary worktree that is automatically cleaned up.
1202
1203 Args:
1204 repo: Dulwich repository object
1205 prefix: Prefix for the temporary directory name
1206
1207 Yields:
1208 Worktree object
1209 """
1210 temp_dir = None
1211 worktree = None
1212
1213 try:
1214 # Create temporary directory
1215 temp_dir = tempfile.mkdtemp(prefix=prefix)
1216
1217 # Add worktree
1218 worktree = repo.worktrees.add(temp_dir, exist_ok=True)
1219
1220 yield worktree
1221
1222 finally:
1223 # Clean up worktree registration
1224 if worktree:
1225 repo.worktrees.remove(worktree.path)
1226
1227 # Clean up temporary directory
1228 if temp_dir and Path(temp_dir).exists():
1229 shutil.rmtree(temp_dir)