Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# worktree.py -- Working tree operations for Git repositories
2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Working tree operations for Git repositories."""
24from __future__ import annotations
26import builtins
27import os
28import shutil
29import stat
30import sys
31import tempfile
32import time
33import warnings
34from collections.abc import Iterable, Iterator, Sequence
35from contextlib import contextmanager
36from pathlib import Path
37from typing import Any, Callable, Union
39from .errors import CommitError, HookError
40from .objects import Blob, Commit, ObjectID, Tag, Tree
41from .refs import SYMREF, Ref, local_branch_name
42from .repo import (
43 GITDIR,
44 WORKTREES,
45 Repo,
46 check_user_identity,
47 get_user_identity,
48)
49from .trailers import add_trailer_to_message
52class WorkTreeInfo:
53 """Information about a single worktree.
55 Attributes:
56 path: Path to the worktree
57 head: Current HEAD commit SHA
58 branch: Current branch (if not detached)
59 bare: Whether this is a bare repository
60 detached: Whether HEAD is detached
61 locked: Whether the worktree is locked
62 prunable: Whether the worktree can be pruned
63 lock_reason: Reason for locking (if locked)
64 """
66 def __init__(
67 self,
68 path: str,
69 head: bytes | None = None,
70 branch: bytes | None = None,
71 bare: bool = False,
72 detached: bool = False,
73 locked: bool = False,
74 prunable: bool = False,
75 lock_reason: str | None = None,
76 ):
77 """Initialize WorkTreeInfo.
79 Args:
80 path: Path to the worktree
81 head: Current HEAD commit SHA
82 branch: Current branch (if not detached)
83 bare: Whether this is a bare repository
84 detached: Whether HEAD is detached
85 locked: Whether the worktree is locked
86 prunable: Whether the worktree can be pruned
87 lock_reason: Reason for locking (if locked)
88 """
89 self.path = path
90 self.head = head
91 self.branch = branch
92 self.bare = bare
93 self.detached = detached
94 self.locked = locked
95 self.prunable = prunable
96 self.lock_reason = lock_reason
98 def __repr__(self) -> str:
99 """Return string representation of WorkTreeInfo."""
100 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
102 def __eq__(self, other: object) -> bool:
103 """Check equality with another WorkTreeInfo."""
104 if not isinstance(other, WorkTreeInfo):
105 return NotImplemented
106 return (
107 self.path == other.path
108 and self.head == other.head
109 and self.branch == other.branch
110 and self.bare == other.bare
111 and self.detached == other.detached
112 and self.locked == other.locked
113 and self.prunable == other.prunable
114 and self.lock_reason == other.lock_reason
115 )
117 def open(self) -> WorkTree:
118 """Open this worktree as a WorkTree.
120 Returns:
121 WorkTree object for this worktree
123 Raises:
124 NotGitRepository: If the worktree path is invalid
125 """
126 from .repo import Repo
128 repo = Repo(self.path)
129 return WorkTree(repo, self.path)
132class WorkTreeContainer:
133 """Container for managing multiple working trees.
135 This class manages worktrees for a repository, similar to how
136 RefsContainer manages references.
137 """
139 def __init__(self, repo: Repo) -> None:
140 """Initialize a WorkTreeContainer for the given repository.
142 Args:
143 repo: The repository this container belongs to
144 """
145 self._repo = repo
147 def list(self) -> list[WorkTreeInfo]:
148 """List all worktrees for this repository.
150 Returns:
151 A list of WorkTreeInfo objects
152 """
153 return list_worktrees(self._repo)
155 def add(
156 self,
157 path: str | bytes | os.PathLike[str],
158 branch: str | bytes | None = None,
159 commit: ObjectID | None = None,
160 force: bool = False,
161 detach: bool = False,
162 exist_ok: bool = False,
163 ) -> Repo:
164 """Add a new worktree.
166 Args:
167 path: Path where the new worktree should be created
168 branch: Branch to checkout in the new worktree
169 commit: Specific commit to checkout (results in detached HEAD)
170 force: Force creation even if branch is already checked out elsewhere
171 detach: Detach HEAD in the new worktree
172 exist_ok: If True, do not raise an error if the directory already exists
174 Returns:
175 The newly created worktree repository
176 """
177 return add_worktree(
178 self._repo,
179 path,
180 branch=branch,
181 commit=commit,
182 force=force,
183 detach=detach,
184 exist_ok=exist_ok,
185 )
187 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None:
188 """Remove a worktree.
190 Args:
191 path: Path to the worktree to remove
192 force: Force removal even if there are local changes
193 """
194 remove_worktree(self._repo, path, force=force)
196 def prune(
197 self, expire: int | None = None, dry_run: bool = False
198 ) -> builtins.list[str]:
199 """Prune worktree administrative files for missing worktrees.
201 Args:
202 expire: Only prune worktrees older than this many seconds
203 dry_run: Don't actually remove anything, just report what would be removed
205 Returns:
206 List of pruned worktree identifiers
207 """
208 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
210 def move(
211 self,
212 old_path: str | bytes | os.PathLike[str],
213 new_path: str | bytes | os.PathLike[str],
214 ) -> None:
215 """Move a worktree to a new location.
217 Args:
218 old_path: Current path of the worktree
219 new_path: New path for the worktree
220 """
221 move_worktree(self._repo, old_path, new_path)
223 def lock(
224 self, path: str | bytes | os.PathLike[str], reason: str | None = None
225 ) -> None:
226 """Lock a worktree to prevent it from being pruned.
228 Args:
229 path: Path to the worktree to lock
230 reason: Optional reason for locking
231 """
232 lock_worktree(self._repo, path, reason=reason)
234 def unlock(self, path: str | bytes | os.PathLike[str]) -> None:
235 """Unlock a worktree.
237 Args:
238 path: Path to the worktree to unlock
239 """
240 unlock_worktree(self._repo, path)
242 def repair(
243 self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
244 ) -> builtins.list[str]:
245 """Repair worktree administrative files.
247 Args:
248 paths: Optional list of worktree paths to repair. If None, repairs
249 connections from the main repository to all linked worktrees.
251 Returns:
252 List of repaired worktree paths
253 """
254 return repair_worktree(self._repo, paths=paths)
256 def __iter__(self) -> Iterator[WorkTreeInfo]:
257 """Iterate over all worktrees."""
258 yield from self.list()
261class WorkTree:
262 """Working tree operations for a Git repository.
264 This class provides methods for working with the working tree,
265 such as staging files, committing changes, and resetting the index.
266 """
268 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
269 """Initialize a WorkTree for the given repository.
271 Args:
272 repo: The repository this working tree belongs to
273 path: Path to the working tree directory
274 """
275 self._repo = repo
276 raw_path = os.fspath(path)
277 if isinstance(raw_path, bytes):
278 self.path: str = os.fsdecode(raw_path)
279 else:
280 self.path = raw_path
281 self.path = os.path.abspath(self.path)
283 def stage(
284 self,
285 fs_paths: str
286 | bytes
287 | os.PathLike[str]
288 | Iterable[str | bytes | os.PathLike[str]],
289 ) -> None:
290 """Stage a set of paths.
292 Args:
293 fs_paths: List of paths, relative to the repository path
294 """
295 root_path_bytes = os.fsencode(self.path)
297 if isinstance(fs_paths, (str, bytes, os.PathLike)):
298 fs_paths = [fs_paths]
299 fs_paths = list(fs_paths)
301 from .index import (
302 _fs_to_tree_path,
303 blob_from_path_and_stat,
304 index_entry_from_directory,
305 index_entry_from_stat,
306 )
308 index = self._repo.open_index()
309 blob_normalizer = self._repo.get_blob_normalizer()
310 for fs_path in fs_paths:
311 if not isinstance(fs_path, bytes):
312 fs_path = os.fsencode(fs_path)
313 if os.path.isabs(fs_path):
314 raise ValueError(
315 f"path {fs_path!r} should be relative to "
316 "repository root, not absolute"
317 )
318 tree_path = _fs_to_tree_path(fs_path)
319 full_path = os.path.join(root_path_bytes, fs_path)
320 try:
321 st = os.lstat(full_path)
322 except (FileNotFoundError, NotADirectoryError):
323 # File no longer exists
324 try:
325 del index[tree_path]
326 except KeyError:
327 pass # already removed
328 else:
329 if stat.S_ISDIR(st.st_mode):
330 entry = index_entry_from_directory(st, full_path)
331 if entry:
332 index[tree_path] = entry
333 else:
334 try:
335 del index[tree_path]
336 except KeyError:
337 pass
338 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
339 try:
340 del index[tree_path]
341 except KeyError:
342 pass
343 else:
344 blob = blob_from_path_and_stat(full_path, st)
345 blob = blob_normalizer.checkin_normalize(blob, fs_path)
346 self._repo.object_store.add_object(blob)
347 index[tree_path] = index_entry_from_stat(st, blob.id)
348 index.write()
350 def unstage(self, fs_paths: Sequence[str]) -> None:
351 """Unstage specific file in the index.
353 Args:
354 fs_paths: a list of files to unstage,
355 relative to the repository path.
356 """
357 from .index import IndexEntry, _fs_to_tree_path
359 index = self._repo.open_index()
360 try:
361 commit = self._repo[b"HEAD"]
362 except KeyError:
363 # no head mean no commit in the repo
364 for fs_path in fs_paths:
365 tree_path = _fs_to_tree_path(fs_path)
366 del index[tree_path]
367 index.write()
368 return
369 else:
370 assert isinstance(commit, Commit), "HEAD must be a commit"
371 tree_id = commit.tree
373 for fs_path in fs_paths:
374 tree_path = _fs_to_tree_path(fs_path)
375 try:
376 tree = self._repo.object_store[tree_id]
377 assert isinstance(tree, Tree)
378 tree_entry = tree.lookup_path(
379 self._repo.object_store.__getitem__, tree_path
380 )
381 except KeyError:
382 # if tree_entry didn't exist, this file was being added, so
383 # remove index entry
384 try:
385 del index[tree_path]
386 continue
387 except KeyError as exc:
388 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
390 st = None
391 try:
392 st = os.lstat(os.path.join(self.path, fs_path))
393 except FileNotFoundError:
394 pass
396 blob_obj = self._repo[tree_entry[1]]
397 assert isinstance(blob_obj, Blob)
398 blob_size = len(blob_obj.data)
400 index_entry = IndexEntry(
401 ctime=(commit.commit_time, 0),
402 mtime=(commit.commit_time, 0),
403 dev=st.st_dev if st else 0,
404 ino=st.st_ino if st else 0,
405 mode=tree_entry[0],
406 uid=st.st_uid if st else 0,
407 gid=st.st_gid if st else 0,
408 size=blob_size,
409 sha=tree_entry[1],
410 flags=0,
411 extended_flags=0,
412 )
414 index[tree_path] = index_entry
415 index.write()
417 def commit(
418 self,
419 message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None,
420 committer: bytes | None = None,
421 author: bytes | None = None,
422 commit_timestamp: float | None = None,
423 commit_timezone: int | None = None,
424 author_timestamp: float | None = None,
425 author_timezone: int | None = None,
426 tree: ObjectID | None = None,
427 encoding: bytes | None = None,
428 ref: Ref | None = b"HEAD",
429 merge_heads: Sequence[ObjectID] | None = None,
430 no_verify: bool = False,
431 sign: bool | None = None,
432 signoff: bool | None = None,
433 ) -> ObjectID:
434 """Create a new commit.
436 If not specified, committer and author default to
437 get_user_identity(..., 'COMMITTER')
438 and get_user_identity(..., 'AUTHOR') respectively.
440 Args:
441 message: Commit message (bytes or callable that takes (repo, commit)
442 and returns bytes)
443 committer: Committer fullname
444 author: Author fullname
445 commit_timestamp: Commit timestamp (defaults to now)
446 commit_timezone: Commit timestamp timezone (defaults to GMT)
447 author_timestamp: Author timestamp (defaults to commit
448 timestamp)
449 author_timezone: Author timestamp timezone
450 (defaults to commit timestamp timezone)
451 tree: SHA1 of the tree root to use (if not specified the
452 current index will be committed).
453 encoding: Encoding
454 ref: Optional ref to commit to (defaults to current branch).
455 If None, creates a dangling commit without updating any ref.
456 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
457 no_verify: Skip pre-commit and commit-msg hooks
458 sign: GPG Sign the commit (bool, defaults to False,
459 pass True to use default GPG key,
460 pass a str containing Key ID to use a specific GPG key)
461 signoff: Add Signed-off-by line (DCO) to commit message.
462 If None, uses format.signoff config.
464 Returns:
465 New commit SHA1
466 """
467 try:
468 if not no_verify:
469 self._repo.hooks["pre-commit"].execute()
470 except HookError as exc:
471 raise CommitError(exc) from exc
472 except KeyError: # no hook defined, silent fallthrough
473 pass
475 c = Commit()
476 if tree is None:
477 index = self._repo.open_index()
478 c.tree = index.commit(self._repo.object_store)
479 else:
480 if len(tree) != 40:
481 raise ValueError("tree must be a 40-byte hex sha string")
482 c.tree = tree
484 config = self._repo.get_config_stack()
485 if merge_heads is None:
486 merge_heads = self._repo._read_heads("MERGE_HEAD")
487 if committer is None:
488 committer = get_user_identity(config, kind="COMMITTER")
489 check_user_identity(committer)
490 c.committer = committer
491 if commit_timestamp is None:
492 # FIXME: Support GIT_COMMITTER_DATE environment variable
493 commit_timestamp = time.time()
494 c.commit_time = int(commit_timestamp)
495 if commit_timezone is None:
496 # FIXME: Use current user timezone rather than UTC
497 commit_timezone = 0
498 c.commit_timezone = commit_timezone
499 if author is None:
500 author = get_user_identity(config, kind="AUTHOR")
501 c.author = author
502 check_user_identity(author)
503 if author_timestamp is None:
504 # FIXME: Support GIT_AUTHOR_DATE environment variable
505 author_timestamp = commit_timestamp
506 c.author_time = int(author_timestamp)
507 if author_timezone is None:
508 author_timezone = commit_timezone
509 c.author_timezone = author_timezone
510 if encoding is None:
511 try:
512 encoding = config.get(("i18n",), "commitEncoding")
513 except KeyError:
514 pass # No dice
515 if encoding is not None:
516 c.encoding = encoding
517 # Store original message (might be callable)
518 original_message = message
519 message = None # Will be set later after parents are set
521 # Check if we should sign the commit
522 if sign is None:
523 # Check commit.gpgSign configuration when sign is not explicitly set
524 try:
525 should_sign = config.get_boolean(
526 (b"commit",), b"gpgsign", default=False
527 )
528 except KeyError:
529 should_sign = False # Default to not signing if no config
530 else:
531 should_sign = sign
533 # Get the signing key from config if signing is enabled
534 keyid = None
535 if should_sign:
536 try:
537 keyid_bytes = config.get((b"user",), b"signingkey")
538 keyid = keyid_bytes.decode() if keyid_bytes else None
539 except KeyError:
540 keyid = None
542 if ref is None:
543 # Create a dangling commit
544 c.parents = merge_heads
545 else:
546 try:
547 old_head = self._repo.refs[ref]
548 c.parents = [old_head, *merge_heads]
549 except KeyError:
550 c.parents = merge_heads
552 # Handle message after parents are set
553 if callable(original_message):
554 message = original_message(self._repo, c)
555 if message is None:
556 raise ValueError("Message callback returned None")
557 else:
558 message = original_message
560 if message is None:
561 # FIXME: Try to read commit message from .git/MERGE_MSG
562 raise ValueError("No commit message specified")
564 # Handle signoff
565 should_signoff = signoff
566 if should_signoff is None:
567 # Check format.signOff configuration
568 try:
569 should_signoff = config.get_boolean(
570 (b"format",), b"signoff", default=False
571 )
572 except KeyError:
573 should_signoff = False
575 if should_signoff:
576 # Add Signed-off-by trailer
577 # Get the committer identity for the signoff
578 signoff_identity = committer
579 if isinstance(message, bytes):
580 message_bytes = message
581 else:
582 message_bytes = message.encode("utf-8")
584 message_bytes = add_trailer_to_message(
585 message_bytes,
586 "Signed-off-by",
587 signoff_identity.decode("utf-8")
588 if isinstance(signoff_identity, bytes)
589 else signoff_identity,
590 separator=":",
591 where="end",
592 if_exists="addIfDifferentNeighbor",
593 if_missing="add",
594 )
595 message = message_bytes
597 try:
598 if no_verify:
599 c.message = message
600 else:
601 c.message = self._repo.hooks["commit-msg"].execute(message)
602 if c.message is None:
603 c.message = message
604 except HookError as exc:
605 raise CommitError(exc) from exc
606 except KeyError: # no hook defined, message not modified
607 c.message = message
609 if ref is None:
610 # Create a dangling commit
611 if should_sign:
612 c.sign(keyid)
613 self._repo.object_store.add_object(c)
614 else:
615 try:
616 old_head = self._repo.refs[ref]
617 if should_sign:
618 c.sign(keyid)
619 self._repo.object_store.add_object(c)
620 message_bytes = (
621 message.encode() if isinstance(message, str) else message
622 )
623 ok = self._repo.refs.set_if_equals(
624 ref,
625 old_head,
626 c.id,
627 message=b"commit: " + message_bytes,
628 committer=committer,
629 timestamp=int(commit_timestamp)
630 if commit_timestamp is not None
631 else None,
632 timezone=commit_timezone,
633 )
634 except KeyError:
635 c.parents = merge_heads
636 if should_sign:
637 c.sign(keyid)
638 self._repo.object_store.add_object(c)
639 message_bytes = (
640 message.encode() if isinstance(message, str) else message
641 )
642 ok = self._repo.refs.add_if_new(
643 ref,
644 c.id,
645 message=b"commit: " + message_bytes,
646 committer=committer,
647 timestamp=int(commit_timestamp)
648 if commit_timestamp is not None
649 else None,
650 timezone=commit_timezone,
651 )
652 if not ok:
653 # Fail if the atomic compare-and-swap failed, leaving the
654 # commit and all its objects as garbage.
655 raise CommitError(f"{ref!r} changed during commit")
657 self._repo._del_named_file("MERGE_HEAD")
659 try:
660 self._repo.hooks["post-commit"].execute()
661 except HookError as e: # silent failure
662 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
663 except KeyError: # no hook defined, silent fallthrough
664 pass
666 # Trigger auto GC if needed
667 from .gc import maybe_auto_gc
669 maybe_auto_gc(self._repo)
671 return c.id
673 def reset_index(self, tree: bytes | None = None) -> None:
674 """Reset the index back to a specific tree.
676 Args:
677 tree: Tree SHA to reset to, None for current HEAD tree.
678 """
679 from .index import (
680 build_index_from_tree,
681 symlink,
682 validate_path_element_default,
683 validate_path_element_hfs,
684 validate_path_element_ntfs,
685 )
687 if tree is None:
688 head = self._repo[b"HEAD"]
689 if isinstance(head, Tag):
690 _cls, obj = head.object
691 head = self._repo.get_object(obj)
692 from .objects import Commit
694 assert isinstance(head, Commit)
695 tree = head.tree
696 config = self._repo.get_config()
697 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
698 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
699 validate_path_element = validate_path_element_ntfs
700 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
701 validate_path_element = validate_path_element_hfs
702 else:
703 validate_path_element = validate_path_element_default
704 if config.get_boolean(b"core", b"symlinks", True):
705 symlink_fn = symlink
706 else:
708 def symlink_fn( # type: ignore[misc,unused-ignore]
709 src: Union[str, bytes],
710 dst: Union[str, bytes],
711 target_is_directory: bool = False,
712 *,
713 dir_fd: int | None = None,
714 ) -> None:
715 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
716 f.write(src)
718 blob_normalizer = self._repo.get_blob_normalizer()
719 return build_index_from_tree(
720 self.path,
721 self._repo.index_path(),
722 self._repo.object_store,
723 tree,
724 honor_filemode=honor_filemode,
725 validate_path_element=validate_path_element,
726 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
727 blob_normalizer=blob_normalizer,
728 )
730 def _sparse_checkout_file_path(self) -> str:
731 """Return the path of the sparse-checkout file in this repo's control dir."""
732 return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
734 def configure_for_cone_mode(self) -> None:
735 """Ensure the repository is configured for cone-mode sparse-checkout."""
736 config = self._repo.get_config()
737 config.set((b"core",), b"sparseCheckout", b"true")
738 config.set((b"core",), b"sparseCheckoutCone", b"true")
739 config.write_to_path()
741 def infer_cone_mode(self) -> bool:
742 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
743 config = self._repo.get_config()
744 try:
745 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
746 return sc_cone == b"true"
747 except KeyError:
748 # If core.sparseCheckoutCone is not set, default to False
749 return False
751 def get_sparse_checkout_patterns(self) -> list[str]:
752 """Return a list of sparse-checkout patterns from info/sparse-checkout.
754 Returns:
755 A list of patterns. Returns an empty list if the file is missing.
756 """
757 path = self._sparse_checkout_file_path()
758 try:
759 with open(path, encoding="utf-8") as f:
760 return [line.strip() for line in f if line.strip()]
761 except FileNotFoundError:
762 return []
764 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
765 """Write the given sparse-checkout patterns into info/sparse-checkout.
767 Creates the info/ directory if it does not exist.
769 Args:
770 patterns: A list of gitignore-style patterns to store.
771 """
772 info_dir = os.path.join(self._repo.controldir(), "info")
773 os.makedirs(info_dir, exist_ok=True)
775 path = self._sparse_checkout_file_path()
776 with open(path, "w", encoding="utf-8") as f:
777 for pat in patterns:
778 f.write(pat + "\n")
780 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
781 """Write the given cone-mode directory patterns into info/sparse-checkout.
783 For each directory to include, add an inclusion line that "undoes" the prior
784 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
785 Never add the same line twice.
786 """
787 patterns = ["/*", "!/*/"]
788 if dirs:
789 for d in dirs:
790 d = d.strip("/")
791 line = f"/{d}/"
792 if d and line not in patterns:
793 patterns.append(line)
794 self.set_sparse_checkout_patterns(patterns)
797def read_worktree_lock_reason(worktree_path: str) -> str | None:
798 """Read the lock reason for a worktree.
800 Args:
801 worktree_path: Path to the worktree's administrative directory
803 Returns:
804 The lock reason if the worktree is locked, None otherwise
805 """
806 locked_path = os.path.join(worktree_path, "locked")
807 if not os.path.exists(locked_path):
808 return None
810 try:
811 with open(locked_path) as f:
812 return f.read().strip()
813 except (FileNotFoundError, PermissionError):
814 return None
817def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
818 """List all worktrees for the given repository.
820 Args:
821 repo: The repository to list worktrees for
823 Returns:
824 A list of WorkTreeInfo objects
825 """
826 worktrees = []
828 # Add main worktree
829 main_wt_info = WorkTreeInfo(
830 path=repo.path,
831 head=repo.head(),
832 bare=repo.bare,
833 detached=False,
834 locked=False,
835 prunable=False,
836 )
838 # Get branch info for main worktree
839 try:
840 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
841 head_contents = f.read().strip()
842 if head_contents.startswith(SYMREF):
843 ref_name = head_contents[len(SYMREF) :].strip()
844 main_wt_info.branch = ref_name
845 else:
846 main_wt_info.detached = True
847 main_wt_info.branch = None
848 except (FileNotFoundError, PermissionError):
849 main_wt_info.branch = None
850 main_wt_info.detached = True
852 worktrees.append(main_wt_info)
854 # List additional worktrees
855 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
856 if os.path.isdir(worktrees_dir):
857 for entry in os.listdir(worktrees_dir):
858 worktree_path = os.path.join(worktrees_dir, entry)
859 if not os.path.isdir(worktree_path):
860 continue
862 wt_info = WorkTreeInfo(
863 path="", # Will be set below
864 bare=False,
865 detached=False,
866 locked=False,
867 prunable=False,
868 )
870 # Read gitdir to get actual worktree path
871 gitdir_path = os.path.join(worktree_path, GITDIR)
872 try:
873 with open(gitdir_path, "rb") as f:
874 gitdir_contents = f.read().strip()
875 # Convert relative path to absolute if needed
876 wt_path = os.fsdecode(gitdir_contents)
877 if not os.path.isabs(wt_path):
878 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
879 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
880 except (FileNotFoundError, PermissionError):
881 # Worktree directory is missing, skip it
882 # TODO: Consider adding these as prunable worktrees with a placeholder path
883 continue
885 # Check if worktree path exists
886 if wt_info.path and not os.path.exists(wt_info.path):
887 wt_info.prunable = True
889 # Read HEAD
890 head_path = os.path.join(worktree_path, "HEAD")
891 try:
892 with open(head_path, "rb") as f:
893 head_contents = f.read().strip()
894 if head_contents.startswith(SYMREF):
895 ref_name = head_contents[len(SYMREF) :].strip()
896 wt_info.branch = ref_name
897 # Resolve ref to get commit sha
898 try:
899 wt_info.head = repo.refs[ref_name]
900 except KeyError:
901 wt_info.head = None
902 else:
903 wt_info.detached = True
904 wt_info.branch = None
905 wt_info.head = head_contents
906 except (FileNotFoundError, PermissionError):
907 wt_info.head = None
908 wt_info.branch = None
910 # Check if locked
911 lock_reason = read_worktree_lock_reason(worktree_path)
912 if lock_reason is not None:
913 wt_info.locked = True
914 wt_info.lock_reason = lock_reason
916 worktrees.append(wt_info)
918 return worktrees
921def add_worktree(
922 repo: Repo,
923 path: str | bytes | os.PathLike[str],
924 branch: str | bytes | None = None,
925 commit: ObjectID | None = None,
926 force: bool = False,
927 detach: bool = False,
928 exist_ok: bool = False,
929) -> Repo:
930 """Add a new worktree to the repository.
932 Args:
933 repo: The main repository
934 path: Path where the new worktree should be created
935 branch: Branch to checkout in the new worktree (creates if doesn't exist)
936 commit: Specific commit to checkout (results in detached HEAD)
937 force: Force creation even if branch is already checked out elsewhere
938 detach: Detach HEAD in the new worktree
939 exist_ok: If True, do not raise an error if the directory already exists
941 Returns:
942 The newly created worktree repository
944 Raises:
945 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
946 """
947 from .repo import Repo as RepoClass
949 path = os.fspath(path)
950 if isinstance(path, bytes):
951 path = os.fsdecode(path)
953 # Check if path already exists
954 if os.path.exists(path) and not exist_ok:
955 raise ValueError(f"Path already exists: {path}")
957 # Normalize branch name
958 if branch is not None:
959 if isinstance(branch, str):
960 branch = branch.encode()
961 branch = local_branch_name(branch)
963 # Check if branch is already checked out in another worktree
964 if branch and not force:
965 for wt in list_worktrees(repo):
966 if wt.branch == branch:
967 raise ValueError(
968 f"Branch {branch.decode()} is already checked out at {wt.path}"
969 )
971 # Determine what to checkout
972 if commit is not None:
973 checkout_ref = commit
974 detach = True
975 elif branch is not None:
976 # Check if branch exists
977 try:
978 checkout_ref = repo.refs[branch]
979 except KeyError:
980 if commit is None:
981 # Create new branch from HEAD
982 checkout_ref = repo.head()
983 repo.refs[branch] = checkout_ref
984 else:
985 # Create new branch from specified commit
986 checkout_ref = commit
987 repo.refs[branch] = checkout_ref
988 else:
989 # Default to current HEAD
990 checkout_ref = repo.head()
991 detach = True
993 # Create the worktree directory
994 os.makedirs(path, exist_ok=exist_ok)
996 # Initialize the worktree
997 identifier = os.path.basename(path)
998 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
1000 # Set HEAD appropriately
1001 if detach:
1002 # Detached HEAD - write SHA directly to HEAD
1003 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
1004 f.write(checkout_ref + b"\n")
1005 else:
1006 # Point to branch
1007 assert branch is not None # Should be guaranteed by logic above
1008 wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
1010 # Reset index to match HEAD
1011 wt_repo.get_worktree().reset_index()
1013 return wt_repo
1016def remove_worktree(
1017 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False
1018) -> None:
1019 """Remove a worktree.
1021 Args:
1022 repo: The main repository
1023 path: Path to the worktree to remove
1024 force: Force removal even if there are local changes
1026 Raises:
1027 ValueError: If the worktree doesn't exist, has local changes, or is locked
1028 """
1029 path = os.fspath(path)
1030 if isinstance(path, bytes):
1031 path = os.fsdecode(path)
1033 # Don't allow removing the main worktree
1034 if os.path.abspath(path) == os.path.abspath(repo.path):
1035 raise ValueError("Cannot remove the main working tree")
1037 # Find the worktree
1038 worktree_found = False
1039 worktree_id = None
1040 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1042 if os.path.isdir(worktrees_dir):
1043 for entry in os.listdir(worktrees_dir):
1044 worktree_path = os.path.join(worktrees_dir, entry)
1045 gitdir_path = os.path.join(worktree_path, GITDIR)
1047 try:
1048 with open(gitdir_path, "rb") as f:
1049 gitdir_contents = f.read().strip()
1050 wt_path = os.fsdecode(gitdir_contents)
1051 if not os.path.isabs(wt_path):
1052 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1053 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1055 if os.path.abspath(wt_dir) == os.path.abspath(path):
1056 worktree_found = True
1057 worktree_id = entry
1058 break
1059 except (FileNotFoundError, PermissionError):
1060 continue
1062 if not worktree_found:
1063 raise ValueError(f"Worktree not found: {path}")
1065 assert worktree_id is not None # Should be set if worktree_found is True
1066 worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
1068 # Check if locked
1069 if os.path.exists(os.path.join(worktree_control_dir, "locked")):
1070 if not force:
1071 raise ValueError(f"Worktree is locked: {path}")
1073 # Check for local changes if not forcing
1074 if not force and os.path.exists(path):
1075 # TODO: Check for uncommitted changes in the worktree
1076 pass
1078 # Remove the working directory
1079 if os.path.exists(path):
1080 shutil.rmtree(path)
1082 # Remove the administrative files
1083 shutil.rmtree(worktree_control_dir)
1086def prune_worktrees(
1087 repo: Repo, expire: int | None = None, dry_run: bool = False
1088) -> list[str]:
1089 """Prune worktree administrative files for missing worktrees.
1091 Args:
1092 repo: The main repository
1093 expire: Only prune worktrees older than this many seconds
1094 dry_run: Don't actually remove anything, just report what would be removed
1096 Returns:
1097 List of pruned worktree identifiers
1098 """
1099 pruned: list[str] = []
1100 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1102 if not os.path.isdir(worktrees_dir):
1103 return pruned
1105 current_time = time.time()
1107 for entry in os.listdir(worktrees_dir):
1108 worktree_path = os.path.join(worktrees_dir, entry)
1109 if not os.path.isdir(worktree_path):
1110 continue
1112 # Skip locked worktrees
1113 if os.path.exists(os.path.join(worktree_path, "locked")):
1114 continue
1116 should_prune = False
1118 # Check if gitdir exists and points to valid location
1119 gitdir_path = os.path.join(worktree_path, GITDIR)
1120 try:
1121 with open(gitdir_path, "rb") as f:
1122 gitdir_contents = f.read().strip()
1123 wt_path = os.fsdecode(gitdir_contents)
1124 if not os.path.isabs(wt_path):
1125 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1126 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1128 if not os.path.exists(wt_dir):
1129 should_prune = True
1130 except (FileNotFoundError, PermissionError):
1131 should_prune = True
1133 # Check expiry time if specified
1134 if should_prune and expire is not None:
1135 stat_info = os.stat(worktree_path)
1136 age = current_time - stat_info.st_mtime
1137 if age < expire:
1138 should_prune = False
1140 if should_prune:
1141 pruned.append(entry)
1142 if not dry_run:
1143 shutil.rmtree(worktree_path)
1145 return pruned
1148def lock_worktree(
1149 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None
1150) -> None:
1151 """Lock a worktree to prevent it from being pruned.
1153 Args:
1154 repo: The main repository
1155 path: Path to the worktree to lock
1156 reason: Optional reason for locking
1157 """
1158 worktree_id = _find_worktree_id(repo, path)
1159 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1161 lock_path = os.path.join(worktree_control_dir, "locked")
1162 with open(lock_path, "w") as f:
1163 if reason:
1164 f.write(reason)
1167def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
1168 """Unlock a worktree.
1170 Args:
1171 repo: The main repository
1172 path: Path to the worktree to unlock
1173 """
1174 worktree_id = _find_worktree_id(repo, path)
1175 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1177 lock_path = os.path.join(worktree_control_dir, "locked")
1178 if os.path.exists(lock_path):
1179 os.remove(lock_path)
1182def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str:
1183 """Find the worktree identifier for the given path.
1185 Args:
1186 repo: The main repository
1187 path: Path to the worktree
1189 Returns:
1190 The worktree identifier
1192 Raises:
1193 ValueError: If the worktree is not found
1194 """
1195 path = os.fspath(path)
1196 if isinstance(path, bytes):
1197 path = os.fsdecode(path)
1199 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1201 if os.path.isdir(worktrees_dir):
1202 for entry in os.listdir(worktrees_dir):
1203 worktree_path = os.path.join(worktrees_dir, entry)
1204 gitdir_path = os.path.join(worktree_path, GITDIR)
1206 try:
1207 with open(gitdir_path, "rb") as f:
1208 gitdir_contents = f.read().strip()
1209 wt_path = os.fsdecode(gitdir_contents)
1210 if not os.path.isabs(wt_path):
1211 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1212 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1214 if os.path.abspath(wt_dir) == os.path.abspath(path):
1215 return entry
1216 except (FileNotFoundError, PermissionError):
1217 continue
1219 raise ValueError(f"Worktree not found: {path}")
1222def move_worktree(
1223 repo: Repo,
1224 old_path: str | bytes | os.PathLike[str],
1225 new_path: str | bytes | os.PathLike[str],
1226) -> None:
1227 """Move a worktree to a new location.
1229 Args:
1230 repo: The main repository
1231 old_path: Current path of the worktree
1232 new_path: New path for the worktree
1234 Raises:
1235 ValueError: If the worktree doesn't exist or new path already exists
1236 """
1237 old_path = os.fspath(old_path)
1238 new_path = os.fspath(new_path)
1239 if isinstance(old_path, bytes):
1240 old_path = os.fsdecode(old_path)
1241 if isinstance(new_path, bytes):
1242 new_path = os.fsdecode(new_path)
1244 # Don't allow moving the main worktree
1245 if os.path.abspath(old_path) == os.path.abspath(repo.path):
1246 raise ValueError("Cannot move the main working tree")
1248 # Check if new path already exists
1249 if os.path.exists(new_path):
1250 raise ValueError(f"Path already exists: {new_path}")
1252 # Find the worktree
1253 worktree_id = _find_worktree_id(repo, old_path)
1254 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1256 # Move the actual worktree directory
1257 shutil.move(old_path, new_path)
1259 # Update the gitdir file in the worktree
1260 gitdir_file = os.path.join(new_path, ".git")
1262 # Update the gitdir pointer in the control directory
1263 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
1264 f.write(os.fsencode(gitdir_file) + b"\n")
1267def repair_worktree(
1268 repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
1269) -> list[str]:
1270 """Repair worktree administrative files.
1272 This repairs the connection between worktrees and the main repository
1273 when they have been moved or become corrupted.
1275 Args:
1276 repo: The main repository
1277 paths: Optional list of worktree paths to repair. If None, repairs
1278 connections from the main repository to all linked worktrees.
1280 Returns:
1281 List of repaired worktree paths
1283 Raises:
1284 ValueError: If a specified path is not a valid worktree
1285 """
1286 repaired: list[str] = []
1287 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1289 if paths:
1290 # Repair specific worktrees
1291 for path in paths:
1292 path_str = os.fspath(path)
1293 if isinstance(path_str, bytes):
1294 path_str = os.fsdecode(path_str)
1295 path_str = os.path.abspath(path_str)
1297 # Check if this is a linked worktree
1298 gitdir_file = os.path.join(path_str, ".git")
1299 if not os.path.exists(gitdir_file):
1300 raise ValueError(f"Not a valid worktree: {path_str}")
1302 # Read the .git file to get the worktree control directory
1303 try:
1304 with open(gitdir_file, "rb") as f:
1305 gitdir_content = f.read().strip()
1306 if gitdir_content.startswith(b"gitdir: "):
1307 worktree_control_path = gitdir_content[8:].decode()
1308 else:
1309 raise ValueError(f"Invalid .git file in worktree: {path_str}")
1310 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
1311 raise ValueError(
1312 f"Cannot read .git file in worktree: {path_str}"
1313 ) from e
1315 # Make the path absolute if it's relative
1316 if not os.path.isabs(worktree_control_path):
1317 worktree_control_path = os.path.abspath(
1318 os.path.join(path_str, worktree_control_path)
1319 )
1321 # Update the gitdir file in the worktree control directory
1322 gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
1323 if os.path.exists(gitdir_pointer):
1324 # Update to point to the current location
1325 with open(gitdir_pointer, "wb") as f:
1326 f.write(os.fsencode(gitdir_file) + b"\n")
1327 repaired.append(path_str)
1328 else:
1329 # Repair from main repository to all linked worktrees
1330 if not os.path.isdir(worktrees_dir):
1331 return repaired
1333 for entry in os.listdir(worktrees_dir):
1334 worktree_control_path = os.path.join(worktrees_dir, entry)
1335 if not os.path.isdir(worktree_control_path):
1336 continue
1338 # Read the gitdir file to find where the worktree thinks it is
1339 gitdir_path = os.path.join(worktree_control_path, GITDIR)
1340 try:
1341 with open(gitdir_path, "rb") as f:
1342 gitdir_contents = f.read().strip()
1343 old_gitdir_location = os.fsdecode(gitdir_contents)
1344 except (FileNotFoundError, PermissionError):
1345 # Can't repair if we can't read the gitdir file
1346 continue
1348 # Get the worktree directory (remove .git suffix)
1349 old_worktree_path = os.path.dirname(old_gitdir_location)
1351 # Check if the .git file exists at the old location
1352 if os.path.exists(old_gitdir_location):
1353 # Try to read and update the .git file to ensure it points back correctly
1354 try:
1355 with open(old_gitdir_location, "rb") as f:
1356 content = f.read().strip()
1357 if content.startswith(b"gitdir: "):
1358 current_pointer = content[8:].decode()
1359 if not os.path.isabs(current_pointer):
1360 current_pointer = os.path.abspath(
1361 os.path.join(old_worktree_path, current_pointer)
1362 )
1364 # If it doesn't point to the right place, fix it
1365 expected_pointer = worktree_control_path
1366 if os.path.abspath(current_pointer) != os.path.abspath(
1367 expected_pointer
1368 ):
1369 # Update the .git file to point to the correct location
1370 with open(old_gitdir_location, "wb") as wf:
1371 wf.write(
1372 b"gitdir: "
1373 + os.fsencode(worktree_control_path)
1374 + b"\n"
1375 )
1376 repaired.append(old_worktree_path)
1377 except (PermissionError, UnicodeDecodeError):
1378 continue
1380 return repaired
1383@contextmanager
1384def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
1385 """Create a temporary worktree that is automatically cleaned up.
1387 Args:
1388 repo: Dulwich repository object
1389 prefix: Prefix for the temporary directory name
1391 Yields:
1392 Worktree object
1393 """
1394 temp_dir = None
1395 worktree = None
1397 try:
1398 # Create temporary directory
1399 temp_dir = tempfile.mkdtemp(prefix=prefix)
1401 # Add worktree
1402 worktree = repo.worktrees.add(temp_dir, exist_ok=True)
1404 yield worktree
1406 finally:
1407 # Clean up worktree registration
1408 if worktree:
1409 repo.worktrees.remove(worktree.path)
1411 # Clean up temporary directory
1412 if temp_dir and Path(temp_dir).exists():
1413 shutil.rmtree(temp_dir)