1# worktree.py -- Working tree operations for Git repositories
2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Working tree operations for Git repositories."""
23
24from __future__ import annotations
25
26import builtins
27import os
28import shutil
29import stat
30import sys
31import tempfile
32import time
33import warnings
34from collections.abc import Iterable, Iterator
35from contextlib import contextmanager
36from pathlib import Path
37
38from .errors import CommitError, HookError
39from .objects import Commit, ObjectID, Tag, Tree
40from .refs import SYMREF, Ref
41from .repo import (
42 GITDIR,
43 WORKTREES,
44 Repo,
45 check_user_identity,
46 get_user_identity,
47)
48
49
50class WorkTreeInfo:
51 """Information about a single worktree.
52
53 Attributes:
54 path: Path to the worktree
55 head: Current HEAD commit SHA
56 branch: Current branch (if not detached)
57 bare: Whether this is a bare repository
58 detached: Whether HEAD is detached
59 locked: Whether the worktree is locked
60 prunable: Whether the worktree can be pruned
61 lock_reason: Reason for locking (if locked)
62 """
63
64 def __init__(
65 self,
66 path: str,
67 head: bytes | None = None,
68 branch: bytes | None = None,
69 bare: bool = False,
70 detached: bool = False,
71 locked: bool = False,
72 prunable: bool = False,
73 lock_reason: str | None = None,
74 ):
75 self.path = path
76 self.head = head
77 self.branch = branch
78 self.bare = bare
79 self.detached = detached
80 self.locked = locked
81 self.prunable = prunable
82 self.lock_reason = lock_reason
83
84 def __repr__(self) -> str:
85 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
86
87 def __eq__(self, other: object) -> bool:
88 if not isinstance(other, WorkTreeInfo):
89 return NotImplemented
90 return (
91 self.path == other.path
92 and self.head == other.head
93 and self.branch == other.branch
94 and self.bare == other.bare
95 and self.detached == other.detached
96 and self.locked == other.locked
97 and self.prunable == other.prunable
98 and self.lock_reason == other.lock_reason
99 )
100
101 def open(self) -> WorkTree:
102 """Open this worktree as a WorkTree.
103
104 Returns:
105 WorkTree object for this worktree
106
107 Raises:
108 NotGitRepository: If the worktree path is invalid
109 """
110 from .repo import Repo
111
112 repo = Repo(self.path)
113 return WorkTree(repo, self.path)
114
115
116class WorkTreeContainer:
117 """Container for managing multiple working trees.
118
119 This class manages worktrees for a repository, similar to how
120 RefsContainer manages references.
121 """
122
123 def __init__(self, repo: Repo) -> None:
124 """Initialize a WorkTreeContainer for the given repository.
125
126 Args:
127 repo: The repository this container belongs to
128 """
129 self._repo = repo
130
131 def list(self) -> list[WorkTreeInfo]:
132 """List all worktrees for this repository.
133
134 Returns:
135 A list of WorkTreeInfo objects
136 """
137 return list_worktrees(self._repo)
138
139 def add(
140 self,
141 path: str | bytes | os.PathLike,
142 branch: str | bytes | None = None,
143 commit: ObjectID | None = None,
144 force: bool = False,
145 detach: bool = False,
146 exist_ok: bool = False,
147 ) -> Repo:
148 """Add a new worktree.
149
150 Args:
151 path: Path where the new worktree should be created
152 branch: Branch to checkout in the new worktree
153 commit: Specific commit to checkout (results in detached HEAD)
154 force: Force creation even if branch is already checked out elsewhere
155 detach: Detach HEAD in the new worktree
156 exist_ok: If True, do not raise an error if the directory already exists
157
158 Returns:
159 The newly created worktree repository
160 """
161 return add_worktree(
162 self._repo,
163 path,
164 branch=branch,
165 commit=commit,
166 force=force,
167 detach=detach,
168 exist_ok=exist_ok,
169 )
170
171 def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None:
172 """Remove a worktree.
173
174 Args:
175 path: Path to the worktree to remove
176 force: Force removal even if there are local changes
177 """
178 remove_worktree(self._repo, path, force=force)
179
180 def prune(
181 self, expire: int | None = None, dry_run: bool = False
182 ) -> builtins.list[str]:
183 """Prune worktree administrative files for missing worktrees.
184
185 Args:
186 expire: Only prune worktrees older than this many seconds
187 dry_run: Don't actually remove anything, just report what would be removed
188
189 Returns:
190 List of pruned worktree identifiers
191 """
192 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
193
194 def move(
195 self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike
196 ) -> None:
197 """Move a worktree to a new location.
198
199 Args:
200 old_path: Current path of the worktree
201 new_path: New path for the worktree
202 """
203 move_worktree(self._repo, old_path, new_path)
204
205 def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None:
206 """Lock a worktree to prevent it from being pruned.
207
208 Args:
209 path: Path to the worktree to lock
210 reason: Optional reason for locking
211 """
212 lock_worktree(self._repo, path, reason=reason)
213
214 def unlock(self, path: str | bytes | os.PathLike) -> None:
215 """Unlock a worktree.
216
217 Args:
218 path: Path to the worktree to unlock
219 """
220 unlock_worktree(self._repo, path)
221
222 def __iter__(self) -> Iterator[WorkTreeInfo]:
223 """Iterate over all worktrees."""
224 yield from self.list()
225
226
227class WorkTree:
228 """Working tree operations for a Git repository.
229
230 This class provides methods for working with the working tree,
231 such as staging files, committing changes, and resetting the index.
232 """
233
234 def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None:
235 """Initialize a WorkTree for the given repository.
236
237 Args:
238 repo: The repository this working tree belongs to
239 path: Path to the working tree directory
240 """
241 self._repo = repo
242 raw_path = os.fspath(path)
243 if isinstance(raw_path, bytes):
244 self.path: str = os.fsdecode(raw_path)
245 else:
246 self.path = raw_path
247 self.path = os.path.abspath(self.path)
248
249 def stage(
250 self,
251 fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
252 ) -> None:
253 """Stage a set of paths.
254
255 Args:
256 fs_paths: List of paths, relative to the repository path
257 """
258 root_path_bytes = os.fsencode(self.path)
259
260 if isinstance(fs_paths, (str, bytes, os.PathLike)):
261 fs_paths = [fs_paths]
262 fs_paths = list(fs_paths)
263
264 from .index import (
265 _fs_to_tree_path,
266 blob_from_path_and_stat,
267 index_entry_from_directory,
268 index_entry_from_stat,
269 )
270
271 index = self._repo.open_index()
272 blob_normalizer = self._repo.get_blob_normalizer()
273 for fs_path in fs_paths:
274 if not isinstance(fs_path, bytes):
275 fs_path = os.fsencode(fs_path)
276 if os.path.isabs(fs_path):
277 raise ValueError(
278 f"path {fs_path!r} should be relative to "
279 "repository root, not absolute"
280 )
281 tree_path = _fs_to_tree_path(fs_path)
282 full_path = os.path.join(root_path_bytes, fs_path)
283 try:
284 st = os.lstat(full_path)
285 except (FileNotFoundError, NotADirectoryError):
286 # File no longer exists
287 try:
288 del index[tree_path]
289 except KeyError:
290 pass # already removed
291 else:
292 if stat.S_ISDIR(st.st_mode):
293 entry = index_entry_from_directory(st, full_path)
294 if entry:
295 index[tree_path] = entry
296 else:
297 try:
298 del index[tree_path]
299 except KeyError:
300 pass
301 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
302 try:
303 del index[tree_path]
304 except KeyError:
305 pass
306 else:
307 blob = blob_from_path_and_stat(full_path, st)
308 blob = blob_normalizer.checkin_normalize(blob, fs_path)
309 self._repo.object_store.add_object(blob)
310 index[tree_path] = index_entry_from_stat(st, blob.id)
311 index.write()
312
313 def unstage(self, fs_paths: list[str]) -> None:
314 """Unstage specific file in the index
315 Args:
316 fs_paths: a list of files to unstage,
317 relative to the repository path.
318 """
319 from .index import IndexEntry, _fs_to_tree_path
320
321 index = self._repo.open_index()
322 try:
323 tree_id = self._repo[b"HEAD"].tree
324 except KeyError:
325 # no head mean no commit in the repo
326 for fs_path in fs_paths:
327 tree_path = _fs_to_tree_path(fs_path)
328 del index[tree_path]
329 index.write()
330 return
331
332 for fs_path in fs_paths:
333 tree_path = _fs_to_tree_path(fs_path)
334 try:
335 tree = self._repo.object_store[tree_id]
336 assert isinstance(tree, Tree)
337 tree_entry = tree.lookup_path(
338 self._repo.object_store.__getitem__, tree_path
339 )
340 except KeyError:
341 # if tree_entry didn't exist, this file was being added, so
342 # remove index entry
343 try:
344 del index[tree_path]
345 continue
346 except KeyError as exc:
347 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
348
349 st = None
350 try:
351 st = os.lstat(os.path.join(self.path, fs_path))
352 except FileNotFoundError:
353 pass
354
355 index_entry = IndexEntry(
356 ctime=(self._repo[b"HEAD"].commit_time, 0),
357 mtime=(self._repo[b"HEAD"].commit_time, 0),
358 dev=st.st_dev if st else 0,
359 ino=st.st_ino if st else 0,
360 mode=tree_entry[0],
361 uid=st.st_uid if st else 0,
362 gid=st.st_gid if st else 0,
363 size=len(self._repo[tree_entry[1]].data),
364 sha=tree_entry[1],
365 flags=0,
366 extended_flags=0,
367 )
368
369 index[tree_path] = index_entry
370 index.write()
371
372 def commit(
373 self,
374 message: bytes | None = None,
375 committer: bytes | None = None,
376 author: bytes | None = None,
377 commit_timestamp: float | None = None,
378 commit_timezone: int | None = None,
379 author_timestamp: float | None = None,
380 author_timezone: int | None = None,
381 tree: ObjectID | None = None,
382 encoding: bytes | None = None,
383 ref: Ref | None = b"HEAD",
384 merge_heads: list[ObjectID] | None = None,
385 no_verify: bool = False,
386 sign: bool = False,
387 ) -> ObjectID:
388 """Create a new commit.
389
390 If not specified, committer and author default to
391 get_user_identity(..., 'COMMITTER')
392 and get_user_identity(..., 'AUTHOR') respectively.
393
394 Args:
395 message: Commit message (bytes or callable that takes (repo, commit)
396 and returns bytes)
397 committer: Committer fullname
398 author: Author fullname
399 commit_timestamp: Commit timestamp (defaults to now)
400 commit_timezone: Commit timestamp timezone (defaults to GMT)
401 author_timestamp: Author timestamp (defaults to commit
402 timestamp)
403 author_timezone: Author timestamp timezone
404 (defaults to commit timestamp timezone)
405 tree: SHA1 of the tree root to use (if not specified the
406 current index will be committed).
407 encoding: Encoding
408 ref: Optional ref to commit to (defaults to current branch).
409 If None, creates a dangling commit without updating any ref.
410 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
411 no_verify: Skip pre-commit and commit-msg hooks
412 sign: GPG Sign the commit (bool, defaults to False,
413 pass True to use default GPG key,
414 pass a str containing Key ID to use a specific GPG key)
415
416 Returns:
417 New commit SHA1
418 """
419 try:
420 if not no_verify:
421 self._repo.hooks["pre-commit"].execute()
422 except HookError as exc:
423 raise CommitError(exc) from exc
424 except KeyError: # no hook defined, silent fallthrough
425 pass
426
427 c = Commit()
428 if tree is None:
429 index = self._repo.open_index()
430 c.tree = index.commit(self._repo.object_store)
431 else:
432 if len(tree) != 40:
433 raise ValueError("tree must be a 40-byte hex sha string")
434 c.tree = tree
435
436 config = self._repo.get_config_stack()
437 if merge_heads is None:
438 merge_heads = self._repo._read_heads("MERGE_HEAD")
439 if committer is None:
440 committer = get_user_identity(config, kind="COMMITTER")
441 check_user_identity(committer)
442 c.committer = committer
443 if commit_timestamp is None:
444 # FIXME: Support GIT_COMMITTER_DATE environment variable
445 commit_timestamp = time.time()
446 c.commit_time = int(commit_timestamp)
447 if commit_timezone is None:
448 # FIXME: Use current user timezone rather than UTC
449 commit_timezone = 0
450 c.commit_timezone = commit_timezone
451 if author is None:
452 author = get_user_identity(config, kind="AUTHOR")
453 c.author = author
454 check_user_identity(author)
455 if author_timestamp is None:
456 # FIXME: Support GIT_AUTHOR_DATE environment variable
457 author_timestamp = commit_timestamp
458 c.author_time = int(author_timestamp)
459 if author_timezone is None:
460 author_timezone = commit_timezone
461 c.author_timezone = author_timezone
462 if encoding is None:
463 try:
464 encoding = config.get(("i18n",), "commitEncoding")
465 except KeyError:
466 pass # No dice
467 if encoding is not None:
468 c.encoding = encoding
469 # Store original message (might be callable)
470 original_message = message
471 message = None # Will be set later after parents are set
472
473 # Check if we should sign the commit
474 should_sign = sign
475 if sign is None:
476 # Check commit.gpgSign configuration when sign is not explicitly set
477 config = self._repo.get_config_stack()
478 try:
479 should_sign = config.get_boolean((b"commit",), b"gpgSign")
480 except KeyError:
481 should_sign = False # Default to not signing if no config
482 keyid = sign if isinstance(sign, str) else None
483
484 if ref is None:
485 # Create a dangling commit
486 c.parents = merge_heads
487 else:
488 try:
489 old_head = self._repo.refs[ref]
490 c.parents = [old_head, *merge_heads]
491 except KeyError:
492 c.parents = merge_heads
493
494 # Handle message after parents are set
495 if callable(original_message):
496 message = original_message(self._repo, c)
497 if message is None:
498 raise ValueError("Message callback returned None")
499 else:
500 message = original_message
501
502 if message is None:
503 # FIXME: Try to read commit message from .git/MERGE_MSG
504 raise ValueError("No commit message specified")
505
506 try:
507 if no_verify:
508 c.message = message
509 else:
510 c.message = self._repo.hooks["commit-msg"].execute(message)
511 if c.message is None:
512 c.message = message
513 except HookError as exc:
514 raise CommitError(exc) from exc
515 except KeyError: # no hook defined, message not modified
516 c.message = message
517
518 if ref is None:
519 # Create a dangling commit
520 if should_sign:
521 c.sign(keyid)
522 self._repo.object_store.add_object(c)
523 else:
524 try:
525 old_head = self._repo.refs[ref]
526 if should_sign:
527 c.sign(keyid)
528 self._repo.object_store.add_object(c)
529 ok = self._repo.refs.set_if_equals(
530 ref,
531 old_head,
532 c.id,
533 message=b"commit: " + message,
534 committer=committer,
535 timestamp=commit_timestamp,
536 timezone=commit_timezone,
537 )
538 except KeyError:
539 c.parents = merge_heads
540 if should_sign:
541 c.sign(keyid)
542 self._repo.object_store.add_object(c)
543 ok = self._repo.refs.add_if_new(
544 ref,
545 c.id,
546 message=b"commit: " + message,
547 committer=committer,
548 timestamp=commit_timestamp,
549 timezone=commit_timezone,
550 )
551 if not ok:
552 # Fail if the atomic compare-and-swap failed, leaving the
553 # commit and all its objects as garbage.
554 raise CommitError(f"{ref!r} changed during commit")
555
556 self._repo._del_named_file("MERGE_HEAD")
557
558 try:
559 self._repo.hooks["post-commit"].execute()
560 except HookError as e: # silent failure
561 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
562 except KeyError: # no hook defined, silent fallthrough
563 pass
564
565 # Trigger auto GC if needed
566 from .gc import maybe_auto_gc
567
568 maybe_auto_gc(self._repo)
569
570 return c.id
571
572 def reset_index(self, tree: bytes | None = None) -> None:
573 """Reset the index back to a specific tree.
574
575 Args:
576 tree: Tree SHA to reset to, None for current HEAD tree.
577 """
578 from .index import (
579 build_index_from_tree,
580 symlink,
581 validate_path_element_default,
582 validate_path_element_hfs,
583 validate_path_element_ntfs,
584 )
585
586 if tree is None:
587 head = self._repo[b"HEAD"]
588 if isinstance(head, Tag):
589 _cls, obj = head.object
590 head = self._repo.get_object(obj)
591 tree = head.tree
592 config = self._repo.get_config()
593 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
594 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
595 validate_path_element = validate_path_element_ntfs
596 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
597 validate_path_element = validate_path_element_hfs
598 else:
599 validate_path_element = validate_path_element_default
600 if config.get_boolean(b"core", b"symlinks", True):
601 symlink_fn = symlink
602 else:
603
604 def symlink_fn(source, target) -> None: # type: ignore
605 with open(
606 target, "w" + ("b" if isinstance(source, bytes) else "")
607 ) as f:
608 f.write(source)
609
610 blob_normalizer = self._repo.get_blob_normalizer()
611 return build_index_from_tree(
612 self.path,
613 self._repo.index_path(),
614 self._repo.object_store,
615 tree,
616 honor_filemode=honor_filemode,
617 validate_path_element=validate_path_element,
618 symlink_fn=symlink_fn,
619 blob_normalizer=blob_normalizer,
620 )
621
622 def _sparse_checkout_file_path(self) -> str:
623 """Return the path of the sparse-checkout file in this repo's control dir."""
624 return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
625
626 def configure_for_cone_mode(self) -> None:
627 """Ensure the repository is configured for cone-mode sparse-checkout."""
628 config = self._repo.get_config()
629 config.set((b"core",), b"sparseCheckout", b"true")
630 config.set((b"core",), b"sparseCheckoutCone", b"true")
631 config.write_to_path()
632
633 def infer_cone_mode(self) -> bool:
634 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
635 config = self._repo.get_config()
636 try:
637 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
638 return sc_cone == b"true"
639 except KeyError:
640 # If core.sparseCheckoutCone is not set, default to False
641 return False
642
643 def get_sparse_checkout_patterns(self) -> list[str]:
644 """Return a list of sparse-checkout patterns from info/sparse-checkout.
645
646 Returns:
647 A list of patterns. Returns an empty list if the file is missing.
648 """
649 path = self._sparse_checkout_file_path()
650 try:
651 with open(path, encoding="utf-8") as f:
652 return [line.strip() for line in f if line.strip()]
653 except FileNotFoundError:
654 return []
655
656 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
657 """Write the given sparse-checkout patterns into info/sparse-checkout.
658
659 Creates the info/ directory if it does not exist.
660
661 Args:
662 patterns: A list of gitignore-style patterns to store.
663 """
664 info_dir = os.path.join(self._repo.controldir(), "info")
665 os.makedirs(info_dir, exist_ok=True)
666
667 path = self._sparse_checkout_file_path()
668 with open(path, "w", encoding="utf-8") as f:
669 for pat in patterns:
670 f.write(pat + "\n")
671
672 def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None:
673 """Write the given cone-mode directory patterns into info/sparse-checkout.
674
675 For each directory to include, add an inclusion line that "undoes" the prior
676 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
677 Never add the same line twice.
678 """
679 patterns = ["/*", "!/*/"]
680 if dirs:
681 for d in dirs:
682 d = d.strip("/")
683 line = f"/{d}/"
684 if d and line not in patterns:
685 patterns.append(line)
686 self.set_sparse_checkout_patterns(patterns)
687
688
689def read_worktree_lock_reason(worktree_path: str) -> str | None:
690 """Read the lock reason for a worktree.
691
692 Args:
693 worktree_path: Path to the worktree's administrative directory
694
695 Returns:
696 The lock reason if the worktree is locked, None otherwise
697 """
698 locked_path = os.path.join(worktree_path, "locked")
699 if not os.path.exists(locked_path):
700 return None
701
702 try:
703 with open(locked_path) as f:
704 return f.read().strip()
705 except (FileNotFoundError, PermissionError):
706 return None
707
708
709def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
710 """List all worktrees for the given repository.
711
712 Args:
713 repo: The repository to list worktrees for
714
715 Returns:
716 A list of WorkTreeInfo objects
717 """
718 worktrees = []
719
720 # Add main worktree
721 main_wt_info = WorkTreeInfo(
722 path=repo.path,
723 head=repo.head(),
724 bare=repo.bare,
725 detached=False,
726 locked=False,
727 prunable=False,
728 )
729
730 # Get branch info for main worktree
731 try:
732 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
733 head_contents = f.read().strip()
734 if head_contents.startswith(SYMREF):
735 ref_name = head_contents[len(SYMREF) :].strip()
736 main_wt_info.branch = ref_name
737 else:
738 main_wt_info.detached = True
739 main_wt_info.branch = None
740 except (FileNotFoundError, PermissionError):
741 main_wt_info.branch = None
742 main_wt_info.detached = True
743
744 worktrees.append(main_wt_info)
745
746 # List additional worktrees
747 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
748 if os.path.isdir(worktrees_dir):
749 for entry in os.listdir(worktrees_dir):
750 worktree_path = os.path.join(worktrees_dir, entry)
751 if not os.path.isdir(worktree_path):
752 continue
753
754 wt_info = WorkTreeInfo(
755 path="", # Will be set below
756 bare=False,
757 detached=False,
758 locked=False,
759 prunable=False,
760 )
761
762 # Read gitdir to get actual worktree path
763 gitdir_path = os.path.join(worktree_path, GITDIR)
764 try:
765 with open(gitdir_path, "rb") as f:
766 gitdir_contents = f.read().strip()
767 # Convert relative path to absolute if needed
768 wt_path = os.fsdecode(gitdir_contents)
769 if not os.path.isabs(wt_path):
770 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
771 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
772 except (FileNotFoundError, PermissionError):
773 # Worktree directory is missing, skip it
774 # TODO: Consider adding these as prunable worktrees with a placeholder path
775 continue
776
777 # Check if worktree path exists
778 if wt_info.path and not os.path.exists(wt_info.path):
779 wt_info.prunable = True
780
781 # Read HEAD
782 head_path = os.path.join(worktree_path, "HEAD")
783 try:
784 with open(head_path, "rb") as f:
785 head_contents = f.read().strip()
786 if head_contents.startswith(SYMREF):
787 ref_name = head_contents[len(SYMREF) :].strip()
788 wt_info.branch = ref_name
789 # Resolve ref to get commit sha
790 try:
791 wt_info.head = repo.refs[ref_name]
792 except KeyError:
793 wt_info.head = None
794 else:
795 wt_info.detached = True
796 wt_info.branch = None
797 wt_info.head = head_contents
798 except (FileNotFoundError, PermissionError):
799 wt_info.head = None
800 wt_info.branch = None
801
802 # Check if locked
803 lock_reason = read_worktree_lock_reason(worktree_path)
804 if lock_reason is not None:
805 wt_info.locked = True
806 wt_info.lock_reason = lock_reason
807
808 worktrees.append(wt_info)
809
810 return worktrees
811
812
813def add_worktree(
814 repo: Repo,
815 path: str | bytes | os.PathLike,
816 branch: str | bytes | None = None,
817 commit: ObjectID | None = None,
818 force: bool = False,
819 detach: bool = False,
820 exist_ok: bool = False,
821) -> Repo:
822 """Add a new worktree to the repository.
823
824 Args:
825 repo: The main repository
826 path: Path where the new worktree should be created
827 branch: Branch to checkout in the new worktree (creates if doesn't exist)
828 commit: Specific commit to checkout (results in detached HEAD)
829 force: Force creation even if branch is already checked out elsewhere
830 detach: Detach HEAD in the new worktree
831 exist_ok: If True, do not raise an error if the directory already exists
832
833 Returns:
834 The newly created worktree repository
835
836 Raises:
837 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
838 """
839 from .repo import Repo as RepoClass
840
841 path = os.fspath(path)
842 if isinstance(path, bytes):
843 path = os.fsdecode(path)
844
845 # Check if path already exists
846 if os.path.exists(path) and not exist_ok:
847 raise ValueError(f"Path already exists: {path}")
848
849 # Normalize branch name
850 if branch is not None:
851 if isinstance(branch, str):
852 branch = branch.encode()
853 if not branch.startswith(b"refs/heads/"):
854 branch = b"refs/heads/" + branch
855
856 # Check if branch is already checked out in another worktree
857 if branch and not force:
858 for wt in list_worktrees(repo):
859 if wt.branch == branch:
860 raise ValueError(
861 f"Branch {branch.decode()} is already checked out at {wt.path}"
862 )
863
864 # Determine what to checkout
865 if commit is not None:
866 checkout_ref = commit
867 detach = True
868 elif branch is not None:
869 # Check if branch exists
870 try:
871 checkout_ref = repo.refs[branch]
872 except KeyError:
873 if commit is None:
874 # Create new branch from HEAD
875 checkout_ref = repo.head()
876 repo.refs[branch] = checkout_ref
877 else:
878 # Create new branch from specified commit
879 checkout_ref = commit
880 repo.refs[branch] = checkout_ref
881 else:
882 # Default to current HEAD
883 checkout_ref = repo.head()
884 detach = True
885
886 # Create the worktree directory
887 os.makedirs(path, exist_ok=exist_ok)
888
889 # Initialize the worktree
890 identifier = os.path.basename(path)
891 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
892
893 # Set HEAD appropriately
894 if detach:
895 # Detached HEAD - write SHA directly to HEAD
896 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
897 f.write(checkout_ref + b"\n")
898 else:
899 # Point to branch
900 wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
901
902 # Reset index to match HEAD
903 wt_repo.get_worktree().reset_index()
904
905 return wt_repo
906
907
908def remove_worktree(
909 repo: Repo, path: str | bytes | os.PathLike, force: bool = False
910) -> None:
911 """Remove a worktree.
912
913 Args:
914 repo: The main repository
915 path: Path to the worktree to remove
916 force: Force removal even if there are local changes
917
918 Raises:
919 ValueError: If the worktree doesn't exist, has local changes, or is locked
920 """
921 path = os.fspath(path)
922 if isinstance(path, bytes):
923 path = os.fsdecode(path)
924
925 # Don't allow removing the main worktree
926 if os.path.abspath(path) == os.path.abspath(repo.path):
927 raise ValueError("Cannot remove the main working tree")
928
929 # Find the worktree
930 worktree_found = False
931 worktree_id = None
932 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
933
934 if os.path.isdir(worktrees_dir):
935 for entry in os.listdir(worktrees_dir):
936 worktree_path = os.path.join(worktrees_dir, entry)
937 gitdir_path = os.path.join(worktree_path, GITDIR)
938
939 try:
940 with open(gitdir_path, "rb") as f:
941 gitdir_contents = f.read().strip()
942 wt_path = os.fsdecode(gitdir_contents)
943 if not os.path.isabs(wt_path):
944 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
945 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
946
947 if os.path.abspath(wt_dir) == os.path.abspath(path):
948 worktree_found = True
949 worktree_id = entry
950 break
951 except (FileNotFoundError, PermissionError):
952 continue
953
954 if not worktree_found:
955 raise ValueError(f"Worktree not found: {path}")
956
957 assert worktree_id is not None # Should be set if worktree_found is True
958 worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
959
960 # Check if locked
961 if os.path.exists(os.path.join(worktree_control_dir, "locked")):
962 if not force:
963 raise ValueError(f"Worktree is locked: {path}")
964
965 # Check for local changes if not forcing
966 if not force and os.path.exists(path):
967 # TODO: Check for uncommitted changes in the worktree
968 pass
969
970 # Remove the working directory
971 if os.path.exists(path):
972 shutil.rmtree(path)
973
974 # Remove the administrative files
975 shutil.rmtree(worktree_control_dir)
976
977
978def prune_worktrees(
979 repo: Repo, expire: int | None = None, dry_run: bool = False
980) -> list[str]:
981 """Prune worktree administrative files for missing worktrees.
982
983 Args:
984 repo: The main repository
985 expire: Only prune worktrees older than this many seconds
986 dry_run: Don't actually remove anything, just report what would be removed
987
988 Returns:
989 List of pruned worktree identifiers
990 """
991 pruned: list[str] = []
992 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
993
994 if not os.path.isdir(worktrees_dir):
995 return pruned
996
997 current_time = time.time()
998
999 for entry in os.listdir(worktrees_dir):
1000 worktree_path = os.path.join(worktrees_dir, entry)
1001 if not os.path.isdir(worktree_path):
1002 continue
1003
1004 # Skip locked worktrees
1005 if os.path.exists(os.path.join(worktree_path, "locked")):
1006 continue
1007
1008 should_prune = False
1009
1010 # Check if gitdir exists and points to valid location
1011 gitdir_path = os.path.join(worktree_path, GITDIR)
1012 try:
1013 with open(gitdir_path, "rb") as f:
1014 gitdir_contents = f.read().strip()
1015 wt_path = os.fsdecode(gitdir_contents)
1016 if not os.path.isabs(wt_path):
1017 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1018 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1019
1020 if not os.path.exists(wt_dir):
1021 should_prune = True
1022 except (FileNotFoundError, PermissionError):
1023 should_prune = True
1024
1025 # Check expiry time if specified
1026 if should_prune and expire is not None:
1027 stat_info = os.stat(worktree_path)
1028 age = current_time - stat_info.st_mtime
1029 if age < expire:
1030 should_prune = False
1031
1032 if should_prune:
1033 pruned.append(entry)
1034 if not dry_run:
1035 shutil.rmtree(worktree_path)
1036
1037 return pruned
1038
1039
1040def lock_worktree(
1041 repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None
1042) -> None:
1043 """Lock a worktree to prevent it from being pruned.
1044
1045 Args:
1046 repo: The main repository
1047 path: Path to the worktree to lock
1048 reason: Optional reason for locking
1049 """
1050 worktree_id = _find_worktree_id(repo, path)
1051 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1052
1053 lock_path = os.path.join(worktree_control_dir, "locked")
1054 with open(lock_path, "w") as f:
1055 if reason:
1056 f.write(reason)
1057
1058
1059def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None:
1060 """Unlock a worktree.
1061
1062 Args:
1063 repo: The main repository
1064 path: Path to the worktree to unlock
1065 """
1066 worktree_id = _find_worktree_id(repo, path)
1067 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1068
1069 lock_path = os.path.join(worktree_control_dir, "locked")
1070 if os.path.exists(lock_path):
1071 os.remove(lock_path)
1072
1073
1074def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str:
1075 """Find the worktree identifier for the given path.
1076
1077 Args:
1078 repo: The main repository
1079 path: Path to the worktree
1080
1081 Returns:
1082 The worktree identifier
1083
1084 Raises:
1085 ValueError: If the worktree is not found
1086 """
1087 path = os.fspath(path)
1088 if isinstance(path, bytes):
1089 path = os.fsdecode(path)
1090
1091 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1092
1093 if os.path.isdir(worktrees_dir):
1094 for entry in os.listdir(worktrees_dir):
1095 worktree_path = os.path.join(worktrees_dir, entry)
1096 gitdir_path = os.path.join(worktree_path, GITDIR)
1097
1098 try:
1099 with open(gitdir_path, "rb") as f:
1100 gitdir_contents = f.read().strip()
1101 wt_path = os.fsdecode(gitdir_contents)
1102 if not os.path.isabs(wt_path):
1103 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1104 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1105
1106 if os.path.abspath(wt_dir) == os.path.abspath(path):
1107 return entry
1108 except (FileNotFoundError, PermissionError):
1109 continue
1110
1111 raise ValueError(f"Worktree not found: {path}")
1112
1113
1114def move_worktree(
1115 repo: Repo,
1116 old_path: str | bytes | os.PathLike,
1117 new_path: str | bytes | os.PathLike,
1118) -> None:
1119 """Move a worktree to a new location.
1120
1121 Args:
1122 repo: The main repository
1123 old_path: Current path of the worktree
1124 new_path: New path for the worktree
1125
1126 Raises:
1127 ValueError: If the worktree doesn't exist or new path already exists
1128 """
1129 old_path = os.fspath(old_path)
1130 new_path = os.fspath(new_path)
1131 if isinstance(old_path, bytes):
1132 old_path = os.fsdecode(old_path)
1133 if isinstance(new_path, bytes):
1134 new_path = os.fsdecode(new_path)
1135
1136 # Don't allow moving the main worktree
1137 if os.path.abspath(old_path) == os.path.abspath(repo.path):
1138 raise ValueError("Cannot move the main working tree")
1139
1140 # Check if new path already exists
1141 if os.path.exists(new_path):
1142 raise ValueError(f"Path already exists: {new_path}")
1143
1144 # Find the worktree
1145 worktree_id = _find_worktree_id(repo, old_path)
1146 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1147
1148 # Move the actual worktree directory
1149 shutil.move(old_path, new_path)
1150
1151 # Update the gitdir file in the worktree
1152 gitdir_file = os.path.join(new_path, ".git")
1153
1154 # Update the gitdir pointer in the control directory
1155 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
1156 f.write(os.fsencode(gitdir_file) + b"\n")
1157
1158
1159@contextmanager
1160def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
1161 """Create a temporary worktree that is automatically cleaned up.
1162
1163 Args:
1164 repo: Dulwich repository object
1165 prefix: Prefix for the temporary directory name
1166
1167 Yields:
1168 Worktree object
1169 """
1170 temp_dir = None
1171 worktree = None
1172
1173 try:
1174 # Create temporary directory
1175 temp_dir = tempfile.mkdtemp(prefix=prefix)
1176
1177 # Add worktree
1178 worktree = repo.worktrees.add(temp_dir, exist_ok=True)
1179
1180 yield worktree
1181
1182 finally:
1183 # Clean up worktree registration
1184 if worktree:
1185 repo.worktrees.remove(worktree.path)
1186
1187 # Clean up temporary directory
1188 if temp_dir and Path(temp_dir).exists():
1189 shutil.rmtree(temp_dir)