Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# worktree.py -- Working tree operations for Git repositories
2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Working tree operations for Git repositories."""
24from __future__ import annotations
26__all__ = [
27 "WorkTree",
28 "WorkTreeContainer",
29 "WorkTreeInfo",
30 "add_worktree",
31 "list_worktrees",
32 "lock_worktree",
33 "move_worktree",
34 "prune_worktrees",
35 "read_worktree_lock_reason",
36 "remove_worktree",
37 "repair_worktree",
38 "temporary_worktree",
39 "unlock_worktree",
40]
42import builtins
43import os
44import shutil
45import stat
46import sys
47import tempfile
48import time
49import warnings
50from collections.abc import Callable, Iterable, Iterator, Sequence
51from contextlib import contextmanager
52from pathlib import Path
53from typing import Any
55from .errors import CommitError, HookError
56from .objects import Blob, Commit, ObjectID, Tag, Tree
57from .refs import SYMREF, Ref, local_branch_name
58from .repo import (
59 GITDIR,
60 WORKTREES,
61 Repo,
62 check_user_identity,
63 get_user_identity,
64)
65from .trailers import add_trailer_to_message
68class WorkTreeInfo:
69 """Information about a single worktree.
71 Attributes:
72 path: Path to the worktree
73 head: Current HEAD commit SHA
74 branch: Current branch (if not detached)
75 bare: Whether this is a bare repository
76 detached: Whether HEAD is detached
77 locked: Whether the worktree is locked
78 prunable: Whether the worktree can be pruned
79 lock_reason: Reason for locking (if locked)
80 """
82 def __init__(
83 self,
84 path: str,
85 head: bytes | None = None,
86 branch: Ref | None = None,
87 bare: bool = False,
88 detached: bool = False,
89 locked: bool = False,
90 prunable: bool = False,
91 lock_reason: str | None = None,
92 ):
93 """Initialize WorkTreeInfo.
95 Args:
96 path: Path to the worktree
97 head: Current HEAD commit SHA
98 branch: Current branch (if not detached)
99 bare: Whether this is a bare repository
100 detached: Whether HEAD is detached
101 locked: Whether the worktree is locked
102 prunable: Whether the worktree can be pruned
103 lock_reason: Reason for locking (if locked)
104 """
105 self.path = path
106 self.head = head
107 self.branch = branch
108 self.bare = bare
109 self.detached = detached
110 self.locked = locked
111 self.prunable = prunable
112 self.lock_reason = lock_reason
114 def __repr__(self) -> str:
115 """Return string representation of WorkTreeInfo."""
116 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
118 def __eq__(self, other: object) -> bool:
119 """Check equality with another WorkTreeInfo."""
120 if not isinstance(other, WorkTreeInfo):
121 return NotImplemented
122 return (
123 self.path == other.path
124 and self.head == other.head
125 and self.branch == other.branch
126 and self.bare == other.bare
127 and self.detached == other.detached
128 and self.locked == other.locked
129 and self.prunable == other.prunable
130 and self.lock_reason == other.lock_reason
131 )
133 def open(self) -> WorkTree:
134 """Open this worktree as a WorkTree.
136 Returns:
137 WorkTree object for this worktree
139 Raises:
140 NotGitRepository: If the worktree path is invalid
141 """
142 from .repo import Repo
144 repo = Repo(self.path)
145 return WorkTree(repo, self.path)
148class WorkTreeContainer:
149 """Container for managing multiple working trees.
151 This class manages worktrees for a repository, similar to how
152 RefsContainer manages references.
153 """
155 def __init__(self, repo: Repo) -> None:
156 """Initialize a WorkTreeContainer for the given repository.
158 Args:
159 repo: The repository this container belongs to
160 """
161 self._repo = repo
163 def list(self) -> list[WorkTreeInfo]:
164 """List all worktrees for this repository.
166 Returns:
167 A list of WorkTreeInfo objects
168 """
169 return list_worktrees(self._repo)
171 def add(
172 self,
173 path: str | bytes | os.PathLike[str],
174 branch: str | bytes | None = None,
175 commit: ObjectID | None = None,
176 force: bool = False,
177 detach: bool = False,
178 exist_ok: bool = False,
179 ) -> Repo:
180 """Add a new worktree.
182 Args:
183 path: Path where the new worktree should be created
184 branch: Branch to checkout in the new worktree
185 commit: Specific commit to checkout (results in detached HEAD)
186 force: Force creation even if branch is already checked out elsewhere
187 detach: Detach HEAD in the new worktree
188 exist_ok: If True, do not raise an error if the directory already exists
190 Returns:
191 The newly created worktree repository
192 """
193 return add_worktree(
194 self._repo,
195 path,
196 branch=branch,
197 commit=commit,
198 force=force,
199 detach=detach,
200 exist_ok=exist_ok,
201 )
203 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None:
204 """Remove a worktree.
206 Args:
207 path: Path to the worktree to remove
208 force: Force removal even if there are local changes
209 """
210 remove_worktree(self._repo, path, force=force)
212 def prune(
213 self, expire: int | None = None, dry_run: bool = False
214 ) -> builtins.list[str]:
215 """Prune worktree administrative files for missing worktrees.
217 Args:
218 expire: Only prune worktrees older than this many seconds
219 dry_run: Don't actually remove anything, just report what would be removed
221 Returns:
222 List of pruned worktree identifiers
223 """
224 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
226 def move(
227 self,
228 old_path: str | bytes | os.PathLike[str],
229 new_path: str | bytes | os.PathLike[str],
230 ) -> None:
231 """Move a worktree to a new location.
233 Args:
234 old_path: Current path of the worktree
235 new_path: New path for the worktree
236 """
237 move_worktree(self._repo, old_path, new_path)
239 def lock(
240 self, path: str | bytes | os.PathLike[str], reason: str | None = None
241 ) -> None:
242 """Lock a worktree to prevent it from being pruned.
244 Args:
245 path: Path to the worktree to lock
246 reason: Optional reason for locking
247 """
248 lock_worktree(self._repo, path, reason=reason)
250 def unlock(self, path: str | bytes | os.PathLike[str]) -> None:
251 """Unlock a worktree.
253 Args:
254 path: Path to the worktree to unlock
255 """
256 unlock_worktree(self._repo, path)
258 def repair(
259 self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
260 ) -> builtins.list[str]:
261 """Repair worktree administrative files.
263 Args:
264 paths: Optional list of worktree paths to repair. If None, repairs
265 connections from the main repository to all linked worktrees.
267 Returns:
268 List of repaired worktree paths
269 """
270 return repair_worktree(self._repo, paths=paths)
272 def __iter__(self) -> Iterator[WorkTreeInfo]:
273 """Iterate over all worktrees."""
274 yield from self.list()
277class WorkTree:
278 """Working tree operations for a Git repository.
280 This class provides methods for working with the working tree,
281 such as staging files, committing changes, and resetting the index.
282 """
284 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
285 """Initialize a WorkTree for the given repository.
287 Args:
288 repo: The repository this working tree belongs to
289 path: Path to the working tree directory
290 """
291 self._repo = repo
292 raw_path = os.fspath(path)
293 if isinstance(raw_path, bytes):
294 self.path: str = os.fsdecode(raw_path)
295 else:
296 self.path = raw_path
297 self.path = os.path.abspath(self.path)
299 def stage(
300 self,
301 fs_paths: str
302 | bytes
303 | os.PathLike[str]
304 | Iterable[str | bytes | os.PathLike[str]],
305 ) -> None:
306 """Stage a set of paths.
308 Args:
309 fs_paths: List of paths, relative to the repository path
310 """
311 root_path_bytes = os.fsencode(self.path)
313 if isinstance(fs_paths, (str, bytes, os.PathLike)):
314 fs_paths = [fs_paths]
315 fs_paths = list(fs_paths)
317 from .index import (
318 _fs_to_tree_path,
319 blob_from_path_and_stat,
320 index_entry_from_directory,
321 index_entry_from_stat,
322 )
324 index = self._repo.open_index()
325 blob_normalizer = self._repo.get_blob_normalizer()
326 for fs_path in fs_paths:
327 if not isinstance(fs_path, bytes):
328 fs_path = os.fsencode(fs_path)
329 if os.path.isabs(fs_path):
330 raise ValueError(
331 f"path {fs_path!r} should be relative to "
332 "repository root, not absolute"
333 )
334 tree_path = _fs_to_tree_path(fs_path)
335 full_path = os.path.join(root_path_bytes, fs_path)
336 try:
337 st = os.lstat(full_path)
338 except (FileNotFoundError, NotADirectoryError):
339 # File no longer exists
340 try:
341 del index[tree_path]
342 except KeyError:
343 pass # already removed
344 else:
345 if stat.S_ISDIR(st.st_mode):
346 entry = index_entry_from_directory(st, full_path)
347 if entry:
348 index[tree_path] = entry
349 else:
350 try:
351 del index[tree_path]
352 except KeyError:
353 pass
354 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
355 try:
356 del index[tree_path]
357 except KeyError:
358 pass
359 else:
360 blob = blob_from_path_and_stat(full_path, st)
361 blob = blob_normalizer.checkin_normalize(blob, fs_path)
362 self._repo.object_store.add_object(blob)
363 index[tree_path] = index_entry_from_stat(st, blob.id)
364 index.write()
366 def unstage(self, fs_paths: Sequence[str]) -> None:
367 """Unstage specific file in the index.
369 Args:
370 fs_paths: a list of files to unstage,
371 relative to the repository path.
372 """
373 from .index import IndexEntry, _fs_to_tree_path
375 index = self._repo.open_index()
376 try:
377 commit = self._repo[Ref(b"HEAD")]
378 except KeyError:
379 # no head mean no commit in the repo
380 for fs_path in fs_paths:
381 tree_path = _fs_to_tree_path(fs_path)
382 del index[tree_path]
383 index.write()
384 return
385 else:
386 assert isinstance(commit, Commit), "HEAD must be a commit"
387 tree_id = commit.tree
389 for fs_path in fs_paths:
390 tree_path = _fs_to_tree_path(fs_path)
391 try:
392 tree = self._repo.object_store[tree_id]
393 assert isinstance(tree, Tree)
394 tree_entry = tree.lookup_path(
395 self._repo.object_store.__getitem__, tree_path
396 )
397 except KeyError:
398 # if tree_entry didn't exist, this file was being added, so
399 # remove index entry
400 try:
401 del index[tree_path]
402 continue
403 except KeyError as exc:
404 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
406 st = None
407 try:
408 st = os.lstat(os.path.join(self.path, fs_path))
409 except FileNotFoundError:
410 pass
412 blob_obj = self._repo[tree_entry[1]]
413 assert isinstance(blob_obj, Blob)
414 blob_size = len(blob_obj.data)
416 index_entry = IndexEntry(
417 ctime=(commit.commit_time, 0),
418 mtime=(commit.commit_time, 0),
419 dev=st.st_dev if st else 0,
420 ino=st.st_ino if st else 0,
421 mode=tree_entry[0],
422 uid=st.st_uid if st else 0,
423 gid=st.st_gid if st else 0,
424 size=blob_size,
425 sha=tree_entry[1],
426 flags=0,
427 extended_flags=0,
428 )
430 index[tree_path] = index_entry
431 index.write()
433 def commit(
434 self,
435 message: str | bytes | Callable[[Any, Commit], bytes] | None = None,
436 committer: bytes | None = None,
437 author: bytes | None = None,
438 commit_timestamp: float | None = None,
439 commit_timezone: int | None = None,
440 author_timestamp: float | None = None,
441 author_timezone: int | None = None,
442 tree: ObjectID | None = None,
443 encoding: bytes | None = None,
444 ref: Ref | None = Ref(b"HEAD"),
445 merge_heads: Sequence[ObjectID] | None = None,
446 no_verify: bool = False,
447 sign: bool | None = None,
448 signoff: bool | None = None,
449 ) -> ObjectID:
450 """Create a new commit.
452 If not specified, committer and author default to
453 get_user_identity(..., 'COMMITTER')
454 and get_user_identity(..., 'AUTHOR') respectively.
456 Args:
457 message: Commit message (bytes or callable that takes (repo, commit)
458 and returns bytes)
459 committer: Committer fullname
460 author: Author fullname
461 commit_timestamp: Commit timestamp (defaults to now)
462 commit_timezone: Commit timestamp timezone (defaults to GMT)
463 author_timestamp: Author timestamp (defaults to commit
464 timestamp)
465 author_timezone: Author timestamp timezone
466 (defaults to commit timestamp timezone)
467 tree: SHA1 of the tree root to use (if not specified the
468 current index will be committed).
469 encoding: Encoding
470 ref: Optional ref to commit to (defaults to current branch).
471 If None, creates a dangling commit without updating any ref.
472 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
473 no_verify: Skip pre-commit and commit-msg hooks
474 sign: GPG Sign the commit (bool, defaults to False,
475 pass True to use default GPG key,
476 pass a str containing Key ID to use a specific GPG key)
477 signoff: Add Signed-off-by line (DCO) to commit message.
478 If None, uses format.signoff config.
480 Returns:
481 New commit SHA1
482 """
483 try:
484 if not no_verify:
485 self._repo.hooks["pre-commit"].execute()
486 except HookError as exc:
487 raise CommitError(exc) from exc
488 except KeyError: # no hook defined, silent fallthrough
489 pass
491 c = Commit()
492 if tree is None:
493 index = self._repo.open_index()
494 c.tree = index.commit(self._repo.object_store)
495 else:
496 if len(tree) != 40:
497 raise ValueError("tree must be a 40-byte hex sha string")
498 c.tree = tree
500 config = self._repo.get_config_stack()
501 if merge_heads is None:
502 merge_heads = self._repo._read_heads("MERGE_HEAD")
503 if committer is None:
504 committer = get_user_identity(config, kind="COMMITTER")
505 check_user_identity(committer)
506 c.committer = committer
507 if commit_timestamp is None:
508 # FIXME: Support GIT_COMMITTER_DATE environment variable
509 commit_timestamp = time.time()
510 c.commit_time = int(commit_timestamp)
511 if commit_timezone is None:
512 # FIXME: Use current user timezone rather than UTC
513 commit_timezone = 0
514 c.commit_timezone = commit_timezone
515 if author is None:
516 author = get_user_identity(config, kind="AUTHOR")
517 c.author = author
518 check_user_identity(author)
519 if author_timestamp is None:
520 # FIXME: Support GIT_AUTHOR_DATE environment variable
521 author_timestamp = commit_timestamp
522 c.author_time = int(author_timestamp)
523 if author_timezone is None:
524 author_timezone = commit_timezone
525 c.author_timezone = author_timezone
526 if encoding is None:
527 try:
528 encoding = config.get(("i18n",), "commitEncoding")
529 except KeyError:
530 pass # No dice
531 if encoding is not None:
532 c.encoding = encoding
533 # Store original message (might be callable)
534 original_message = message
535 message = None # Will be set later after parents are set
537 # Check if we should sign the commit
538 if sign is None:
539 # Check commit.gpgSign configuration when sign is not explicitly set
540 try:
541 should_sign = config.get_boolean(
542 (b"commit",), b"gpgsign", default=False
543 )
544 except KeyError:
545 should_sign = False # Default to not signing if no config
546 else:
547 should_sign = sign
549 # Get the signing key from config if signing is enabled
550 keyid = None
551 if should_sign:
552 try:
553 keyid_bytes = config.get((b"user",), b"signingkey")
554 keyid = keyid_bytes.decode() if keyid_bytes else None
555 except KeyError:
556 keyid = None
558 if ref is None:
559 # Create a dangling commit
560 c.parents = merge_heads
561 else:
562 try:
563 old_head = self._repo.refs[ref]
564 c.parents = [old_head, *merge_heads]
565 except KeyError:
566 c.parents = merge_heads
568 # Handle message after parents are set
569 if callable(original_message):
570 message = original_message(self._repo, c)
571 if message is None:
572 raise ValueError("Message callback returned None")
573 else:
574 message = original_message
576 if message is None:
577 # FIXME: Try to read commit message from .git/MERGE_MSG
578 raise ValueError("No commit message specified")
580 # Handle signoff
581 should_signoff = signoff
582 if should_signoff is None:
583 # Check format.signOff configuration
584 try:
585 should_signoff = config.get_boolean(
586 (b"format",), b"signoff", default=False
587 )
588 except KeyError:
589 should_signoff = False
591 if should_signoff:
592 # Add Signed-off-by trailer
593 # Get the committer identity for the signoff
594 signoff_identity = committer
595 if isinstance(message, bytes):
596 message_bytes = message
597 else:
598 message_bytes = message.encode("utf-8")
600 message_bytes = add_trailer_to_message(
601 message_bytes,
602 "Signed-off-by",
603 signoff_identity.decode("utf-8")
604 if isinstance(signoff_identity, bytes)
605 else signoff_identity,
606 separator=":",
607 where="end",
608 if_exists="addIfDifferentNeighbor",
609 if_missing="add",
610 )
611 message = message_bytes
613 try:
614 if no_verify:
615 c.message = message
616 else:
617 c.message = self._repo.hooks["commit-msg"].execute(message)
618 if c.message is None:
619 c.message = message
620 except HookError as exc:
621 raise CommitError(exc) from exc
622 except KeyError: # no hook defined, message not modified
623 c.message = message
625 if ref is None:
626 # Create a dangling commit
627 if should_sign:
628 c.sign(keyid)
629 self._repo.object_store.add_object(c)
630 else:
631 try:
632 old_head = self._repo.refs[ref]
633 if should_sign:
634 c.sign(keyid)
635 self._repo.object_store.add_object(c)
636 message_bytes = (
637 message.encode() if isinstance(message, str) else message
638 )
639 ok = self._repo.refs.set_if_equals(
640 ref,
641 old_head,
642 c.id,
643 message=b"commit: " + message_bytes,
644 committer=committer,
645 timestamp=int(commit_timestamp)
646 if commit_timestamp is not None
647 else None,
648 timezone=commit_timezone,
649 )
650 except KeyError:
651 c.parents = merge_heads
652 if should_sign:
653 c.sign(keyid)
654 self._repo.object_store.add_object(c)
655 message_bytes = (
656 message.encode() if isinstance(message, str) else message
657 )
658 ok = self._repo.refs.add_if_new(
659 ref,
660 c.id,
661 message=b"commit: " + message_bytes,
662 committer=committer,
663 timestamp=int(commit_timestamp)
664 if commit_timestamp is not None
665 else None,
666 timezone=commit_timezone,
667 )
668 if not ok:
669 # Fail if the atomic compare-and-swap failed, leaving the
670 # commit and all its objects as garbage.
671 raise CommitError(f"{ref!r} changed during commit")
673 self._repo._del_named_file("MERGE_HEAD")
675 try:
676 self._repo.hooks["post-commit"].execute()
677 except HookError as e: # silent failure
678 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
679 except KeyError: # no hook defined, silent fallthrough
680 pass
682 # Trigger auto GC if needed
683 from .gc import maybe_auto_gc
685 maybe_auto_gc(self._repo)
687 return c.id
689 def reset_index(self, tree: ObjectID | None = None) -> None:
690 """Reset the index back to a specific tree.
692 Args:
693 tree: Tree SHA to reset to, None for current HEAD tree.
694 """
695 from .index import (
696 build_index_from_tree,
697 symlink,
698 validate_path_element_default,
699 validate_path_element_hfs,
700 validate_path_element_ntfs,
701 )
703 if tree is None:
704 head = self._repo[Ref(b"HEAD")]
705 if isinstance(head, Tag):
706 _cls, obj = head.object
707 head = self._repo.get_object(obj)
708 from .objects import Commit
710 assert isinstance(head, Commit)
711 tree = head.tree
712 config = self._repo.get_config()
713 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
714 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
715 validate_path_element = validate_path_element_ntfs
716 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
717 validate_path_element = validate_path_element_hfs
718 else:
719 validate_path_element = validate_path_element_default
720 if config.get_boolean(b"core", b"symlinks", True):
721 symlink_fn = symlink
722 else:
724 def symlink_fn( # type: ignore[misc,unused-ignore]
725 src: str | bytes,
726 dst: str | bytes,
727 target_is_directory: bool = False,
728 *,
729 dir_fd: int | None = None,
730 ) -> None:
731 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
732 f.write(src)
734 blob_normalizer = self._repo.get_blob_normalizer()
735 return build_index_from_tree(
736 self.path,
737 self._repo.index_path(),
738 self._repo.object_store,
739 tree,
740 honor_filemode=honor_filemode,
741 validate_path_element=validate_path_element,
742 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
743 blob_normalizer=blob_normalizer,
744 )
746 def _sparse_checkout_file_path(self) -> str:
747 """Return the path of the sparse-checkout file in this repo's control dir."""
748 return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
750 def configure_for_cone_mode(self) -> None:
751 """Ensure the repository is configured for cone-mode sparse-checkout."""
752 config = self._repo.get_config()
753 config.set((b"core",), b"sparseCheckout", b"true")
754 config.set((b"core",), b"sparseCheckoutCone", b"true")
755 config.write_to_path()
757 def infer_cone_mode(self) -> bool:
758 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
759 config = self._repo.get_config()
760 try:
761 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
762 return sc_cone == b"true"
763 except KeyError:
764 # If core.sparseCheckoutCone is not set, default to False
765 return False
767 def get_sparse_checkout_patterns(self) -> list[str]:
768 """Return a list of sparse-checkout patterns from info/sparse-checkout.
770 Returns:
771 A list of patterns. Returns an empty list if the file is missing.
772 """
773 path = self._sparse_checkout_file_path()
774 try:
775 with open(path, encoding="utf-8") as f:
776 return [line.strip() for line in f if line.strip()]
777 except FileNotFoundError:
778 return []
780 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
781 """Write the given sparse-checkout patterns into info/sparse-checkout.
783 Creates the info/ directory if it does not exist.
785 Args:
786 patterns: A list of gitignore-style patterns to store.
787 """
788 info_dir = os.path.join(self._repo.controldir(), "info")
789 os.makedirs(info_dir, exist_ok=True)
791 path = self._sparse_checkout_file_path()
792 with open(path, "w", encoding="utf-8") as f:
793 for pat in patterns:
794 f.write(pat + "\n")
796 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
797 """Write the given cone-mode directory patterns into info/sparse-checkout.
799 For each directory to include, add an inclusion line that "undoes" the prior
800 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
801 Never add the same line twice.
802 """
803 patterns = ["/*", "!/*/"]
804 if dirs:
805 for d in dirs:
806 d = d.strip("/")
807 line = f"/{d}/"
808 if d and line not in patterns:
809 patterns.append(line)
810 self.set_sparse_checkout_patterns(patterns)
813def read_worktree_lock_reason(worktree_path: str) -> str | None:
814 """Read the lock reason for a worktree.
816 Args:
817 worktree_path: Path to the worktree's administrative directory
819 Returns:
820 The lock reason if the worktree is locked, None otherwise
821 """
822 locked_path = os.path.join(worktree_path, "locked")
823 if not os.path.exists(locked_path):
824 return None
826 try:
827 with open(locked_path) as f:
828 return f.read().strip()
829 except (FileNotFoundError, PermissionError):
830 return None
833def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
834 """List all worktrees for the given repository.
836 Args:
837 repo: The repository to list worktrees for
839 Returns:
840 A list of WorkTreeInfo objects
841 """
842 worktrees = []
844 # Add main worktree
845 main_wt_info = WorkTreeInfo(
846 path=repo.path,
847 head=repo.head(),
848 bare=repo.bare,
849 detached=False,
850 locked=False,
851 prunable=False,
852 )
854 # Get branch info for main worktree
855 try:
856 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
857 head_contents = f.read().strip()
858 if head_contents.startswith(SYMREF):
859 ref_name = Ref(head_contents[len(SYMREF) :].strip())
860 main_wt_info.branch = ref_name
861 else:
862 main_wt_info.detached = True
863 main_wt_info.branch = None
864 except (FileNotFoundError, PermissionError):
865 main_wt_info.branch = None
866 main_wt_info.detached = True
868 worktrees.append(main_wt_info)
870 # List additional worktrees
871 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
872 if os.path.isdir(worktrees_dir):
873 for entry in os.listdir(worktrees_dir):
874 worktree_path = os.path.join(worktrees_dir, entry)
875 if not os.path.isdir(worktree_path):
876 continue
878 wt_info = WorkTreeInfo(
879 path="", # Will be set below
880 bare=False,
881 detached=False,
882 locked=False,
883 prunable=False,
884 )
886 # Read gitdir to get actual worktree path
887 gitdir_path = os.path.join(worktree_path, GITDIR)
888 try:
889 with open(gitdir_path, "rb") as f:
890 gitdir_contents = f.read().strip()
891 # Convert relative path to absolute if needed
892 wt_path = os.fsdecode(gitdir_contents)
893 if not os.path.isabs(wt_path):
894 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
895 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
896 except (FileNotFoundError, PermissionError):
897 # Worktree directory is missing, skip it
898 # TODO: Consider adding these as prunable worktrees with a placeholder path
899 continue
901 # Check if worktree path exists
902 if wt_info.path and not os.path.exists(wt_info.path):
903 wt_info.prunable = True
905 # Read HEAD
906 head_path = os.path.join(worktree_path, "HEAD")
907 try:
908 with open(head_path, "rb") as f:
909 head_contents = f.read().strip()
910 if head_contents.startswith(SYMREF):
911 ref_name = Ref(head_contents[len(SYMREF) :].strip())
912 wt_info.branch = ref_name
913 # Resolve ref to get commit sha
914 try:
915 wt_info.head = repo.refs[ref_name]
916 except KeyError:
917 wt_info.head = None
918 else:
919 wt_info.detached = True
920 wt_info.branch = None
921 wt_info.head = head_contents
922 except (FileNotFoundError, PermissionError):
923 wt_info.head = None
924 wt_info.branch = None
926 # Check if locked
927 lock_reason = read_worktree_lock_reason(worktree_path)
928 if lock_reason is not None:
929 wt_info.locked = True
930 wt_info.lock_reason = lock_reason
932 worktrees.append(wt_info)
934 return worktrees
937def add_worktree(
938 repo: Repo,
939 path: str | bytes | os.PathLike[str],
940 branch: str | bytes | None = None,
941 commit: ObjectID | None = None,
942 force: bool = False,
943 detach: bool = False,
944 exist_ok: bool = False,
945) -> Repo:
946 """Add a new worktree to the repository.
948 Args:
949 repo: The main repository
950 path: Path where the new worktree should be created
951 branch: Branch to checkout in the new worktree (creates if doesn't exist)
952 commit: Specific commit to checkout (results in detached HEAD)
953 force: Force creation even if branch is already checked out elsewhere
954 detach: Detach HEAD in the new worktree
955 exist_ok: If True, do not raise an error if the directory already exists
957 Returns:
958 The newly created worktree repository
960 Raises:
961 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
962 """
963 from .repo import Repo as RepoClass
965 path = os.fspath(path)
966 if isinstance(path, bytes):
967 path = os.fsdecode(path)
969 # Check if path already exists
970 if os.path.exists(path) and not exist_ok:
971 raise ValueError(f"Path already exists: {path}")
973 # Normalize branch name
974 if branch is not None:
975 if isinstance(branch, str):
976 branch = branch.encode()
977 branch = local_branch_name(branch)
979 # Check if branch is already checked out in another worktree
980 if branch and not force:
981 for wt in list_worktrees(repo):
982 if wt.branch == branch:
983 raise ValueError(
984 f"Branch {branch.decode()} is already checked out at {wt.path}"
985 )
987 # Determine what to checkout
988 if commit is not None:
989 checkout_ref = commit
990 detach = True
991 elif branch is not None:
992 # Check if branch exists
993 try:
994 checkout_ref = repo.refs[branch]
995 except KeyError:
996 if commit is None:
997 # Create new branch from HEAD
998 checkout_ref = repo.head()
999 repo.refs[branch] = checkout_ref
1000 else:
1001 # Create new branch from specified commit
1002 checkout_ref = commit
1003 repo.refs[branch] = checkout_ref
1004 else:
1005 # Default to current HEAD
1006 checkout_ref = repo.head()
1007 detach = True
1009 # Create the worktree directory
1010 os.makedirs(path, exist_ok=exist_ok)
1012 # Initialize the worktree
1013 identifier = os.path.basename(path)
1014 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
1016 # Set HEAD appropriately
1017 if detach:
1018 # Detached HEAD - write SHA directly to HEAD
1019 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
1020 f.write(checkout_ref + b"\n")
1021 else:
1022 # Point to branch
1023 assert branch is not None # Should be guaranteed by logic above
1024 from dulwich.refs import HEADREF
1026 wt_repo.refs.set_symbolic_ref(HEADREF, branch)
1028 # Reset index to match HEAD
1029 wt_repo.get_worktree().reset_index()
1031 return wt_repo
1034def remove_worktree(
1035 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False
1036) -> None:
1037 """Remove a worktree.
1039 Args:
1040 repo: The main repository
1041 path: Path to the worktree to remove
1042 force: Force removal even if there are local changes
1044 Raises:
1045 ValueError: If the worktree doesn't exist, has local changes, or is locked
1046 """
1047 path = os.fspath(path)
1048 if isinstance(path, bytes):
1049 path = os.fsdecode(path)
1051 # Don't allow removing the main worktree
1052 if os.path.abspath(path) == os.path.abspath(repo.path):
1053 raise ValueError("Cannot remove the main working tree")
1055 # Find the worktree
1056 worktree_found = False
1057 worktree_id = None
1058 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1060 if os.path.isdir(worktrees_dir):
1061 for entry in os.listdir(worktrees_dir):
1062 worktree_path = os.path.join(worktrees_dir, entry)
1063 gitdir_path = os.path.join(worktree_path, GITDIR)
1065 try:
1066 with open(gitdir_path, "rb") as f:
1067 gitdir_contents = f.read().strip()
1068 wt_path = os.fsdecode(gitdir_contents)
1069 if not os.path.isabs(wt_path):
1070 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1071 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1073 if os.path.abspath(wt_dir) == os.path.abspath(path):
1074 worktree_found = True
1075 worktree_id = entry
1076 break
1077 except (FileNotFoundError, PermissionError):
1078 continue
1080 if not worktree_found:
1081 raise ValueError(f"Worktree not found: {path}")
1083 assert worktree_id is not None # Should be set if worktree_found is True
1084 worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
1086 # Check if locked
1087 if os.path.exists(os.path.join(worktree_control_dir, "locked")):
1088 if not force:
1089 raise ValueError(f"Worktree is locked: {path}")
1091 # Check for local changes if not forcing
1092 if not force and os.path.exists(path):
1093 # TODO: Check for uncommitted changes in the worktree
1094 pass
1096 # Remove the working directory
1097 if os.path.exists(path):
1098 shutil.rmtree(path)
1100 # Remove the administrative files
1101 shutil.rmtree(worktree_control_dir)
1104def prune_worktrees(
1105 repo: Repo, expire: int | None = None, dry_run: bool = False
1106) -> list[str]:
1107 """Prune worktree administrative files for missing worktrees.
1109 Args:
1110 repo: The main repository
1111 expire: Only prune worktrees older than this many seconds
1112 dry_run: Don't actually remove anything, just report what would be removed
1114 Returns:
1115 List of pruned worktree identifiers
1116 """
1117 pruned: list[str] = []
1118 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1120 if not os.path.isdir(worktrees_dir):
1121 return pruned
1123 current_time = time.time()
1125 for entry in os.listdir(worktrees_dir):
1126 worktree_path = os.path.join(worktrees_dir, entry)
1127 if not os.path.isdir(worktree_path):
1128 continue
1130 # Skip locked worktrees
1131 if os.path.exists(os.path.join(worktree_path, "locked")):
1132 continue
1134 should_prune = False
1136 # Check if gitdir exists and points to valid location
1137 gitdir_path = os.path.join(worktree_path, GITDIR)
1138 try:
1139 with open(gitdir_path, "rb") as f:
1140 gitdir_contents = f.read().strip()
1141 wt_path = os.fsdecode(gitdir_contents)
1142 if not os.path.isabs(wt_path):
1143 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1144 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1146 if not os.path.exists(wt_dir):
1147 should_prune = True
1148 except (FileNotFoundError, PermissionError):
1149 should_prune = True
1151 # Check expiry time if specified
1152 if should_prune and expire is not None:
1153 stat_info = os.stat(worktree_path)
1154 age = current_time - stat_info.st_mtime
1155 if age < expire:
1156 should_prune = False
1158 if should_prune:
1159 pruned.append(entry)
1160 if not dry_run:
1161 shutil.rmtree(worktree_path)
1163 return pruned
1166def lock_worktree(
1167 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None
1168) -> None:
1169 """Lock a worktree to prevent it from being pruned.
1171 Args:
1172 repo: The main repository
1173 path: Path to the worktree to lock
1174 reason: Optional reason for locking
1175 """
1176 worktree_id = _find_worktree_id(repo, path)
1177 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1179 lock_path = os.path.join(worktree_control_dir, "locked")
1180 with open(lock_path, "w") as f:
1181 if reason:
1182 f.write(reason)
1185def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
1186 """Unlock a worktree.
1188 Args:
1189 repo: The main repository
1190 path: Path to the worktree to unlock
1191 """
1192 worktree_id = _find_worktree_id(repo, path)
1193 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1195 lock_path = os.path.join(worktree_control_dir, "locked")
1196 if os.path.exists(lock_path):
1197 os.remove(lock_path)
1200def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str:
1201 """Find the worktree identifier for the given path.
1203 Args:
1204 repo: The main repository
1205 path: Path to the worktree
1207 Returns:
1208 The worktree identifier
1210 Raises:
1211 ValueError: If the worktree is not found
1212 """
1213 path = os.fspath(path)
1214 if isinstance(path, bytes):
1215 path = os.fsdecode(path)
1217 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1219 if os.path.isdir(worktrees_dir):
1220 for entry in os.listdir(worktrees_dir):
1221 worktree_path = os.path.join(worktrees_dir, entry)
1222 gitdir_path = os.path.join(worktree_path, GITDIR)
1224 try:
1225 with open(gitdir_path, "rb") as f:
1226 gitdir_contents = f.read().strip()
1227 wt_path = os.fsdecode(gitdir_contents)
1228 if not os.path.isabs(wt_path):
1229 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1230 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1232 if os.path.abspath(wt_dir) == os.path.abspath(path):
1233 return entry
1234 except (FileNotFoundError, PermissionError):
1235 continue
1237 raise ValueError(f"Worktree not found: {path}")
1240def move_worktree(
1241 repo: Repo,
1242 old_path: str | bytes | os.PathLike[str],
1243 new_path: str | bytes | os.PathLike[str],
1244) -> None:
1245 """Move a worktree to a new location.
1247 Args:
1248 repo: The main repository
1249 old_path: Current path of the worktree
1250 new_path: New path for the worktree
1252 Raises:
1253 ValueError: If the worktree doesn't exist or new path already exists
1254 """
1255 old_path = os.fspath(old_path)
1256 new_path = os.fspath(new_path)
1257 if isinstance(old_path, bytes):
1258 old_path = os.fsdecode(old_path)
1259 if isinstance(new_path, bytes):
1260 new_path = os.fsdecode(new_path)
1262 # Don't allow moving the main worktree
1263 if os.path.abspath(old_path) == os.path.abspath(repo.path):
1264 raise ValueError("Cannot move the main working tree")
1266 # Check if new path already exists
1267 if os.path.exists(new_path):
1268 raise ValueError(f"Path already exists: {new_path}")
1270 # Find the worktree
1271 worktree_id = _find_worktree_id(repo, old_path)
1272 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1274 # Move the actual worktree directory
1275 shutil.move(old_path, new_path)
1277 # Update the gitdir file in the worktree
1278 gitdir_file = os.path.join(new_path, ".git")
1280 # Update the gitdir pointer in the control directory
1281 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
1282 f.write(os.fsencode(gitdir_file) + b"\n")
1285def repair_worktree(
1286 repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
1287) -> list[str]:
1288 """Repair worktree administrative files.
1290 This repairs the connection between worktrees and the main repository
1291 when they have been moved or become corrupted.
1293 Args:
1294 repo: The main repository
1295 paths: Optional list of worktree paths to repair. If None, repairs
1296 connections from the main repository to all linked worktrees.
1298 Returns:
1299 List of repaired worktree paths
1301 Raises:
1302 ValueError: If a specified path is not a valid worktree
1303 """
1304 repaired: list[str] = []
1305 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1307 if paths:
1308 # Repair specific worktrees
1309 for path in paths:
1310 path_str = os.fspath(path)
1311 if isinstance(path_str, bytes):
1312 path_str = os.fsdecode(path_str)
1313 path_str = os.path.abspath(path_str)
1315 # Check if this is a linked worktree
1316 gitdir_file = os.path.join(path_str, ".git")
1317 if not os.path.exists(gitdir_file):
1318 raise ValueError(f"Not a valid worktree: {path_str}")
1320 # Read the .git file to get the worktree control directory
1321 try:
1322 with open(gitdir_file, "rb") as f:
1323 gitdir_content = f.read().strip()
1324 if gitdir_content.startswith(b"gitdir: "):
1325 worktree_control_path = gitdir_content[8:].decode()
1326 else:
1327 raise ValueError(f"Invalid .git file in worktree: {path_str}")
1328 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
1329 raise ValueError(
1330 f"Cannot read .git file in worktree: {path_str}"
1331 ) from e
1333 # Make the path absolute if it's relative
1334 if not os.path.isabs(worktree_control_path):
1335 worktree_control_path = os.path.abspath(
1336 os.path.join(path_str, worktree_control_path)
1337 )
1339 # Update the gitdir file in the worktree control directory
1340 gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
1341 if os.path.exists(gitdir_pointer):
1342 # Update to point to the current location
1343 with open(gitdir_pointer, "wb") as f:
1344 f.write(os.fsencode(gitdir_file) + b"\n")
1345 repaired.append(path_str)
1346 else:
1347 # Repair from main repository to all linked worktrees
1348 if not os.path.isdir(worktrees_dir):
1349 return repaired
1351 for entry in os.listdir(worktrees_dir):
1352 worktree_control_path = os.path.join(worktrees_dir, entry)
1353 if not os.path.isdir(worktree_control_path):
1354 continue
1356 # Read the gitdir file to find where the worktree thinks it is
1357 gitdir_path = os.path.join(worktree_control_path, GITDIR)
1358 try:
1359 with open(gitdir_path, "rb") as f:
1360 gitdir_contents = f.read().strip()
1361 old_gitdir_location = os.fsdecode(gitdir_contents)
1362 except (FileNotFoundError, PermissionError):
1363 # Can't repair if we can't read the gitdir file
1364 continue
1366 # Get the worktree directory (remove .git suffix)
1367 old_worktree_path = os.path.dirname(old_gitdir_location)
1369 # Check if the .git file exists at the old location
1370 if os.path.exists(old_gitdir_location):
1371 # Try to read and update the .git file to ensure it points back correctly
1372 try:
1373 with open(old_gitdir_location, "rb") as f:
1374 content = f.read().strip()
1375 if content.startswith(b"gitdir: "):
1376 current_pointer = content[8:].decode()
1377 if not os.path.isabs(current_pointer):
1378 current_pointer = os.path.abspath(
1379 os.path.join(old_worktree_path, current_pointer)
1380 )
1382 # If it doesn't point to the right place, fix it
1383 expected_pointer = worktree_control_path
1384 if os.path.abspath(current_pointer) != os.path.abspath(
1385 expected_pointer
1386 ):
1387 # Update the .git file to point to the correct location
1388 with open(old_gitdir_location, "wb") as wf:
1389 wf.write(
1390 b"gitdir: "
1391 + os.fsencode(worktree_control_path)
1392 + b"\n"
1393 )
1394 repaired.append(old_worktree_path)
1395 except (PermissionError, UnicodeDecodeError):
1396 continue
1398 return repaired
1401@contextmanager
1402def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
1403 """Create a temporary worktree that is automatically cleaned up.
1405 Args:
1406 repo: Dulwich repository object
1407 prefix: Prefix for the temporary directory name
1409 Yields:
1410 Worktree object
1411 """
1412 temp_dir = None
1413 worktree = None
1415 try:
1416 # Create temporary directory
1417 temp_dir = tempfile.mkdtemp(prefix=prefix)
1419 # Add worktree
1420 worktree = repo.worktrees.add(temp_dir, exist_ok=True)
1422 yield worktree
1424 finally:
1425 # Clean up worktree registration
1426 if worktree:
1427 repo.worktrees.remove(worktree.path)
1429 # Clean up temporary directory
1430 if temp_dir and Path(temp_dir).exists():
1431 shutil.rmtree(temp_dir)