Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# worktree.py -- Working tree operations for Git repositories
2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Working tree operations for Git repositories."""
24from __future__ import annotations
26__all__ = [
27 "WorkTree",
28 "WorkTreeContainer",
29 "WorkTreeInfo",
30 "add_worktree",
31 "list_worktrees",
32 "lock_worktree",
33 "move_worktree",
34 "prune_worktrees",
35 "read_worktree_lock_reason",
36 "remove_worktree",
37 "repair_worktree",
38 "temporary_worktree",
39 "unlock_worktree",
40]
42import builtins
43import os
44import shutil
45import stat
46import sys
47import tempfile
48import time
49import warnings
50from collections.abc import Callable, Iterable, Iterator, Sequence
51from contextlib import contextmanager
52from pathlib import Path
53from typing import Any
55from .errors import CommitError, HookError
56from .objects import Blob, Commit, ObjectID, Tag, Tree
57from .refs import SYMREF, Ref, local_branch_name
58from .repo import (
59 GITDIR,
60 WORKTREES,
61 Repo,
62 check_user_identity,
63 get_user_identity,
64)
65from .trailers import add_trailer_to_message
68class WorkTreeInfo:
69 """Information about a single worktree.
71 Attributes:
72 path: Path to the worktree
73 head: Current HEAD commit SHA
74 branch: Current branch (if not detached)
75 bare: Whether this is a bare repository
76 detached: Whether HEAD is detached
77 locked: Whether the worktree is locked
78 prunable: Whether the worktree can be pruned
79 lock_reason: Reason for locking (if locked)
80 """
82 def __init__(
83 self,
84 path: str,
85 head: bytes | None = None,
86 branch: Ref | None = None,
87 bare: bool = False,
88 detached: bool = False,
89 locked: bool = False,
90 prunable: bool = False,
91 lock_reason: str | None = None,
92 ):
93 """Initialize WorkTreeInfo.
95 Args:
96 path: Path to the worktree
97 head: Current HEAD commit SHA
98 branch: Current branch (if not detached)
99 bare: Whether this is a bare repository
100 detached: Whether HEAD is detached
101 locked: Whether the worktree is locked
102 prunable: Whether the worktree can be pruned
103 lock_reason: Reason for locking (if locked)
104 """
105 self.path = path
106 self.head = head
107 self.branch = branch
108 self.bare = bare
109 self.detached = detached
110 self.locked = locked
111 self.prunable = prunable
112 self.lock_reason = lock_reason
114 def __repr__(self) -> str:
115 """Return string representation of WorkTreeInfo."""
116 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
118 def __eq__(self, other: object) -> bool:
119 """Check equality with another WorkTreeInfo."""
120 if not isinstance(other, WorkTreeInfo):
121 return NotImplemented
122 return (
123 self.path == other.path
124 and self.head == other.head
125 and self.branch == other.branch
126 and self.bare == other.bare
127 and self.detached == other.detached
128 and self.locked == other.locked
129 and self.prunable == other.prunable
130 and self.lock_reason == other.lock_reason
131 )
133 def open(self) -> WorkTree:
134 """Open this worktree as a WorkTree.
136 Returns:
137 WorkTree object for this worktree
139 Raises:
140 NotGitRepository: If the worktree path is invalid
141 """
142 from .repo import Repo
144 repo = Repo(self.path)
145 return WorkTree(repo, self.path)
148class WorkTreeContainer:
149 """Container for managing multiple working trees.
151 This class manages worktrees for a repository, similar to how
152 RefsContainer manages references.
153 """
155 def __init__(self, repo: Repo) -> None:
156 """Initialize a WorkTreeContainer for the given repository.
158 Args:
159 repo: The repository this container belongs to
160 """
161 self._repo = repo
163 def list(self) -> list[WorkTreeInfo]:
164 """List all worktrees for this repository.
166 Returns:
167 A list of WorkTreeInfo objects
168 """
169 return list_worktrees(self._repo)
171 def add(
172 self,
173 path: str | bytes | os.PathLike[str],
174 branch: str | bytes | None = None,
175 commit: ObjectID | None = None,
176 force: bool = False,
177 detach: bool = False,
178 exist_ok: bool = False,
179 ) -> Repo:
180 """Add a new worktree.
182 Args:
183 path: Path where the new worktree should be created
184 branch: Branch to checkout in the new worktree
185 commit: Specific commit to checkout (results in detached HEAD)
186 force: Force creation even if branch is already checked out elsewhere
187 detach: Detach HEAD in the new worktree
188 exist_ok: If True, do not raise an error if the directory already exists
190 Returns:
191 The newly created worktree repository
192 """
193 return add_worktree(
194 self._repo,
195 path,
196 branch=branch,
197 commit=commit,
198 force=force,
199 detach=detach,
200 exist_ok=exist_ok,
201 )
203 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None:
204 """Remove a worktree.
206 Args:
207 path: Path to the worktree to remove
208 force: Force removal even if there are local changes
209 """
210 remove_worktree(self._repo, path, force=force)
212 def prune(
213 self, expire: int | None = None, dry_run: bool = False
214 ) -> builtins.list[str]:
215 """Prune worktree administrative files for missing worktrees.
217 Args:
218 expire: Only prune worktrees older than this many seconds
219 dry_run: Don't actually remove anything, just report what would be removed
221 Returns:
222 List of pruned worktree identifiers
223 """
224 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
226 def move(
227 self,
228 old_path: str | bytes | os.PathLike[str],
229 new_path: str | bytes | os.PathLike[str],
230 ) -> None:
231 """Move a worktree to a new location.
233 Args:
234 old_path: Current path of the worktree
235 new_path: New path for the worktree
236 """
237 move_worktree(self._repo, old_path, new_path)
239 def lock(
240 self, path: str | bytes | os.PathLike[str], reason: str | None = None
241 ) -> None:
242 """Lock a worktree to prevent it from being pruned.
244 Args:
245 path: Path to the worktree to lock
246 reason: Optional reason for locking
247 """
248 lock_worktree(self._repo, path, reason=reason)
250 def unlock(self, path: str | bytes | os.PathLike[str]) -> None:
251 """Unlock a worktree.
253 Args:
254 path: Path to the worktree to unlock
255 """
256 unlock_worktree(self._repo, path)
258 def repair(
259 self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
260 ) -> builtins.list[str]:
261 """Repair worktree administrative files.
263 Args:
264 paths: Optional list of worktree paths to repair. If None, repairs
265 connections from the main repository to all linked worktrees.
267 Returns:
268 List of repaired worktree paths
269 """
270 return repair_worktree(self._repo, paths=paths)
272 def __iter__(self) -> Iterator[WorkTreeInfo]:
273 """Iterate over all worktrees."""
274 yield from self.list()
277class WorkTree:
278 """Working tree operations for a Git repository.
280 This class provides methods for working with the working tree,
281 such as staging files, committing changes, and resetting the index.
282 """
284 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
285 """Initialize a WorkTree for the given repository.
287 Args:
288 repo: The repository this working tree belongs to
289 path: Path to the working tree directory
290 """
291 self._repo = repo
292 raw_path = os.fspath(path)
293 if isinstance(raw_path, bytes):
294 self.path: str = os.fsdecode(raw_path)
295 else:
296 self.path = raw_path
297 self.path = os.path.abspath(self.path)
299 def stage(
300 self,
301 fs_paths: str
302 | bytes
303 | os.PathLike[str]
304 | Iterable[str | bytes | os.PathLike[str]],
305 ) -> None:
306 """Stage a set of paths.
308 Args:
309 fs_paths: List of paths, relative to the repository path
310 """
311 root_path_bytes = os.fsencode(self.path)
313 if isinstance(fs_paths, (str, bytes, os.PathLike)):
314 fs_paths = [fs_paths]
315 fs_paths = list(fs_paths)
317 from .index import (
318 _fs_to_tree_path,
319 blob_from_path_and_stat,
320 index_entry_from_directory,
321 index_entry_from_stat,
322 )
324 index = self._repo.open_index()
325 blob_normalizer = self._repo.get_blob_normalizer()
326 for fs_path in fs_paths:
327 if not isinstance(fs_path, bytes):
328 fs_path = os.fsencode(fs_path)
329 if os.path.isabs(fs_path):
330 raise ValueError(
331 f"path {fs_path!r} should be relative to "
332 "repository root, not absolute"
333 )
334 tree_path = _fs_to_tree_path(fs_path)
335 full_path = os.path.join(root_path_bytes, fs_path)
336 try:
337 st = os.lstat(full_path)
338 except (FileNotFoundError, NotADirectoryError):
339 # File no longer exists
340 try:
341 del index[tree_path]
342 except KeyError:
343 pass # already removed
344 else:
345 if stat.S_ISDIR(st.st_mode):
346 entry = index_entry_from_directory(st, full_path)
347 if entry:
348 index[tree_path] = entry
349 else:
350 try:
351 del index[tree_path]
352 except KeyError:
353 pass
354 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
355 try:
356 del index[tree_path]
357 except KeyError:
358 pass
359 else:
360 blob = blob_from_path_and_stat(full_path, st)
361 blob = blob_normalizer.checkin_normalize(blob, fs_path)
362 self._repo.object_store.add_object(blob)
363 index[tree_path] = index_entry_from_stat(st, blob.id)
364 index.write()
366 def unstage(self, fs_paths: Sequence[str]) -> None:
367 """Unstage specific file in the index.
369 Args:
370 fs_paths: a list of files to unstage,
371 relative to the repository path.
372 """
373 from .index import IndexEntry, _fs_to_tree_path
375 index = self._repo.open_index()
376 try:
377 commit = self._repo[Ref(b"HEAD")]
378 except KeyError:
379 # no head mean no commit in the repo
380 for fs_path in fs_paths:
381 tree_path = _fs_to_tree_path(fs_path)
382 del index[tree_path]
383 index.write()
384 return
385 else:
386 assert isinstance(commit, Commit), "HEAD must be a commit"
387 tree_id = commit.tree
389 for fs_path in fs_paths:
390 tree_path = _fs_to_tree_path(fs_path)
391 try:
392 tree = self._repo.object_store[tree_id]
393 assert isinstance(tree, Tree)
394 tree_entry = tree.lookup_path(
395 self._repo.object_store.__getitem__, tree_path
396 )
397 except KeyError:
398 # if tree_entry didn't exist, this file was being added, so
399 # remove index entry
400 try:
401 del index[tree_path]
402 continue
403 except KeyError as exc:
404 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
406 st = None
407 try:
408 st = os.lstat(os.path.join(self.path, fs_path))
409 except FileNotFoundError:
410 pass
412 blob_obj = self._repo[tree_entry[1]]
413 assert isinstance(blob_obj, Blob)
414 blob_size = len(blob_obj.data)
416 index_entry = IndexEntry(
417 ctime=(commit.commit_time, 0),
418 mtime=(commit.commit_time, 0),
419 dev=st.st_dev if st else 0,
420 ino=st.st_ino if st else 0,
421 mode=tree_entry[0],
422 uid=st.st_uid if st else 0,
423 gid=st.st_gid if st else 0,
424 size=blob_size,
425 sha=tree_entry[1],
426 flags=0,
427 extended_flags=0,
428 )
430 index[tree_path] = index_entry
431 index.write()
433 def commit(
434 self,
435 message: str | bytes | Callable[[Any, Commit], bytes] | None = None,
436 committer: bytes | None = None,
437 author: bytes | None = None,
438 commit_timestamp: float | None = None,
439 commit_timezone: int | None = None,
440 author_timestamp: float | None = None,
441 author_timezone: int | None = None,
442 tree: ObjectID | None = None,
443 encoding: bytes | None = None,
444 ref: Ref | None = Ref(b"HEAD"),
445 merge_heads: Sequence[ObjectID] | None = None,
446 no_verify: bool = False,
447 sign: bool | None = None,
448 signoff: bool | None = None,
449 ) -> ObjectID:
450 """Create a new commit.
452 If not specified, committer and author default to
453 get_user_identity(..., 'COMMITTER')
454 and get_user_identity(..., 'AUTHOR') respectively.
456 Args:
457 message: Commit message (bytes or callable that takes (repo, commit)
458 and returns bytes)
459 committer: Committer fullname
460 author: Author fullname
461 commit_timestamp: Commit timestamp (defaults to now)
462 commit_timezone: Commit timestamp timezone (defaults to GMT)
463 author_timestamp: Author timestamp (defaults to commit
464 timestamp)
465 author_timezone: Author timestamp timezone
466 (defaults to commit timestamp timezone)
467 tree: SHA1 of the tree root to use (if not specified the
468 current index will be committed).
469 encoding: Encoding
470 ref: Optional ref to commit to (defaults to current branch).
471 If None, creates a dangling commit without updating any ref.
472 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
473 no_verify: Skip pre-commit and commit-msg hooks
474 sign: GPG Sign the commit (bool, defaults to False,
475 pass True to use default GPG key,
476 pass a str containing Key ID to use a specific GPG key)
477 signoff: Add Signed-off-by line (DCO) to commit message.
478 If None, uses format.signoff config.
480 Returns:
481 New commit SHA1
482 """
483 try:
484 if not no_verify:
485 self._repo.hooks["pre-commit"].execute()
486 except HookError as exc:
487 raise CommitError(exc) from exc
488 except KeyError: # no hook defined, silent fallthrough
489 pass
491 c = Commit()
492 if tree is None:
493 index = self._repo.open_index()
494 c.tree = index.commit(self._repo.object_store)
495 else:
496 if len(tree) != 40:
497 raise ValueError("tree must be a 40-byte hex sha string")
498 c.tree = tree
500 config = self._repo.get_config_stack()
501 if merge_heads is None:
502 merge_heads = self._repo._read_heads("MERGE_HEAD")
503 if committer is None:
504 committer = get_user_identity(config, kind="COMMITTER")
505 check_user_identity(committer)
506 c.committer = committer
507 if commit_timestamp is None:
508 # FIXME: Support GIT_COMMITTER_DATE environment variable
509 commit_timestamp = time.time()
510 c.commit_time = int(commit_timestamp)
511 if commit_timezone is None:
512 # FIXME: Use current user timezone rather than UTC
513 commit_timezone = 0
514 c.commit_timezone = commit_timezone
515 if author is None:
516 author = get_user_identity(config, kind="AUTHOR")
517 c.author = author
518 check_user_identity(author)
519 if author_timestamp is None:
520 # FIXME: Support GIT_AUTHOR_DATE environment variable
521 author_timestamp = commit_timestamp
522 c.author_time = int(author_timestamp)
523 if author_timezone is None:
524 author_timezone = commit_timezone
525 c.author_timezone = author_timezone
526 if encoding is None:
527 try:
528 encoding = config.get(("i18n",), "commitEncoding")
529 except KeyError:
530 pass # No dice
531 if encoding is not None:
532 c.encoding = encoding
533 # Store original message (might be callable)
534 original_message = message
535 message = None # Will be set later after parents are set
537 # Check if we should sign the commit
538 if sign is None:
539 # Check commit.gpgSign configuration when sign is not explicitly set
540 try:
541 should_sign = config.get_boolean(
542 (b"commit",), b"gpgsign", default=False
543 )
544 except KeyError:
545 should_sign = False # Default to not signing if no config
546 else:
547 should_sign = sign
549 # Get the signing key from config if signing is enabled
550 keyid = None
551 if should_sign:
552 try:
553 keyid_bytes = config.get((b"user",), b"signingkey")
554 keyid = keyid_bytes.decode() if keyid_bytes else None
555 except KeyError:
556 keyid = None
558 if ref is None:
559 # Create a dangling commit
560 c.parents = merge_heads
561 else:
562 try:
563 old_head = self._repo.refs[ref]
564 c.parents = [old_head, *merge_heads]
565 except KeyError:
566 c.parents = merge_heads
568 # Handle message after parents are set
569 if callable(original_message):
570 message = original_message(self._repo, c)
571 if message is None:
572 raise ValueError("Message callback returned None")
573 else:
574 message = original_message
576 if message is None:
577 # FIXME: Try to read commit message from .git/MERGE_MSG
578 raise ValueError("No commit message specified")
580 # Handle signoff
581 should_signoff = signoff
582 if should_signoff is None:
583 # Check format.signOff configuration
584 try:
585 should_signoff = config.get_boolean(
586 (b"format",), b"signoff", default=False
587 )
588 except KeyError:
589 should_signoff = False
591 if should_signoff:
592 # Add Signed-off-by trailer
593 # Get the committer identity for the signoff
594 signoff_identity = committer
595 if isinstance(message, bytes):
596 message_bytes = message
597 else:
598 message_bytes = message.encode("utf-8")
600 message_bytes = add_trailer_to_message(
601 message_bytes,
602 "Signed-off-by",
603 signoff_identity.decode("utf-8")
604 if isinstance(signoff_identity, bytes)
605 else signoff_identity,
606 separator=":",
607 where="end",
608 if_exists="addIfDifferentNeighbor",
609 if_missing="add",
610 )
611 message = message_bytes
613 try:
614 if no_verify:
615 c.message = message
616 else:
617 c.message = self._repo.hooks["commit-msg"].execute(message)
618 if c.message is None:
619 c.message = message
620 except HookError as exc:
621 raise CommitError(exc) from exc
622 except KeyError: # no hook defined, message not modified
623 c.message = message
625 if ref is None:
626 # Create a dangling commit
627 if should_sign:
628 from dulwich.signature import get_signature_vendor
630 vendor = get_signature_vendor(config=config)
631 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid)
632 self._repo.object_store.add_object(c)
633 else:
634 try:
635 old_head = self._repo.refs[ref]
636 if should_sign:
637 from dulwich.signature import get_signature_vendor
639 vendor = get_signature_vendor(config=config)
640 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid)
641 self._repo.object_store.add_object(c)
642 message_bytes = (
643 message.encode() if isinstance(message, str) else message
644 )
645 ok = self._repo.refs.set_if_equals(
646 ref,
647 old_head,
648 c.id,
649 message=b"commit: " + message_bytes,
650 committer=committer,
651 timestamp=int(commit_timestamp)
652 if commit_timestamp is not None
653 else None,
654 timezone=commit_timezone,
655 )
656 except KeyError:
657 c.parents = merge_heads
658 if should_sign:
659 from dulwich.signature import get_signature_vendor
661 vendor = get_signature_vendor(config=config)
662 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid)
663 self._repo.object_store.add_object(c)
664 message_bytes = (
665 message.encode() if isinstance(message, str) else message
666 )
667 ok = self._repo.refs.add_if_new(
668 ref,
669 c.id,
670 message=b"commit: " + message_bytes,
671 committer=committer,
672 timestamp=int(commit_timestamp)
673 if commit_timestamp is not None
674 else None,
675 timezone=commit_timezone,
676 )
677 if not ok:
678 # Fail if the atomic compare-and-swap failed, leaving the
679 # commit and all its objects as garbage.
680 raise CommitError(f"{ref!r} changed during commit")
682 self._repo._del_named_file("MERGE_HEAD")
684 try:
685 self._repo.hooks["post-commit"].execute()
686 except HookError as e: # silent failure
687 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
688 except KeyError: # no hook defined, silent fallthrough
689 pass
691 # Trigger auto GC if needed
692 from .gc import maybe_auto_gc
694 maybe_auto_gc(self._repo)
696 return c.id
698 def reset_index(self, tree: ObjectID | None = None) -> None:
699 """Reset the index back to a specific tree.
701 Args:
702 tree: Tree SHA to reset to, None for current HEAD tree.
703 """
704 from .index import (
705 build_index_from_tree,
706 symlink,
707 validate_path_element_default,
708 validate_path_element_hfs,
709 validate_path_element_ntfs,
710 )
712 if tree is None:
713 head = self._repo[Ref(b"HEAD")]
714 if isinstance(head, Tag):
715 _cls, obj = head.object
716 head = self._repo.get_object(obj)
717 from .objects import Commit
719 assert isinstance(head, Commit)
720 tree = head.tree
721 config = self._repo.get_config()
722 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
723 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
724 validate_path_element = validate_path_element_ntfs
725 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
726 validate_path_element = validate_path_element_hfs
727 else:
728 validate_path_element = validate_path_element_default
729 if config.get_boolean(b"core", b"symlinks", True):
730 symlink_fn = symlink
731 else:
733 def symlink_fn( # type: ignore[misc,unused-ignore]
734 src: str | bytes,
735 dst: str | bytes,
736 target_is_directory: bool = False,
737 *,
738 dir_fd: int | None = None,
739 ) -> None:
740 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
741 f.write(src)
743 blob_normalizer = self._repo.get_blob_normalizer()
744 return build_index_from_tree(
745 self.path,
746 self._repo.index_path(),
747 self._repo.object_store,
748 tree,
749 honor_filemode=honor_filemode,
750 validate_path_element=validate_path_element,
751 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
752 blob_normalizer=blob_normalizer,
753 )
755 def _sparse_checkout_file_path(self) -> str:
756 """Return the path of the sparse-checkout file in this repo's control dir."""
757 return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
759 def configure_for_cone_mode(self) -> None:
760 """Ensure the repository is configured for cone-mode sparse-checkout."""
761 config = self._repo.get_config()
762 config.set((b"core",), b"sparseCheckout", b"true")
763 config.set((b"core",), b"sparseCheckoutCone", b"true")
764 config.write_to_path()
766 def infer_cone_mode(self) -> bool:
767 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
768 config = self._repo.get_config()
769 try:
770 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
771 return sc_cone == b"true"
772 except KeyError:
773 # If core.sparseCheckoutCone is not set, default to False
774 return False
776 def get_sparse_checkout_patterns(self) -> list[str]:
777 """Return a list of sparse-checkout patterns from info/sparse-checkout.
779 Returns:
780 A list of patterns. Returns an empty list if the file is missing.
781 """
782 path = self._sparse_checkout_file_path()
783 try:
784 with open(path, encoding="utf-8") as f:
785 return [line.strip() for line in f if line.strip()]
786 except FileNotFoundError:
787 return []
789 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
790 """Write the given sparse-checkout patterns into info/sparse-checkout.
792 Creates the info/ directory if it does not exist.
794 Args:
795 patterns: A list of gitignore-style patterns to store.
796 """
797 info_dir = os.path.join(self._repo.controldir(), "info")
798 os.makedirs(info_dir, exist_ok=True)
800 path = self._sparse_checkout_file_path()
801 with open(path, "w", encoding="utf-8") as f:
802 for pat in patterns:
803 f.write(pat + "\n")
805 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
806 """Write the given cone-mode directory patterns into info/sparse-checkout.
808 For each directory to include, add an inclusion line that "undoes" the prior
809 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
810 Never add the same line twice.
811 """
812 patterns = ["/*", "!/*/"]
813 if dirs:
814 for d in dirs:
815 d = d.strip("/")
816 line = f"/{d}/"
817 if d and line not in patterns:
818 patterns.append(line)
819 self.set_sparse_checkout_patterns(patterns)
822def read_worktree_lock_reason(worktree_path: str) -> str | None:
823 """Read the lock reason for a worktree.
825 Args:
826 worktree_path: Path to the worktree's administrative directory
828 Returns:
829 The lock reason if the worktree is locked, None otherwise
830 """
831 locked_path = os.path.join(worktree_path, "locked")
832 if not os.path.exists(locked_path):
833 return None
835 try:
836 with open(locked_path) as f:
837 return f.read().strip()
838 except (FileNotFoundError, PermissionError):
839 return None
842def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
843 """List all worktrees for the given repository.
845 Args:
846 repo: The repository to list worktrees for
848 Returns:
849 A list of WorkTreeInfo objects
850 """
851 worktrees = []
853 # Add main worktree
854 main_wt_info = WorkTreeInfo(
855 path=repo.path,
856 head=repo.head(),
857 bare=repo.bare,
858 detached=False,
859 locked=False,
860 prunable=False,
861 )
863 # Get branch info for main worktree
864 try:
865 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
866 head_contents = f.read().strip()
867 if head_contents.startswith(SYMREF):
868 ref_name = Ref(head_contents[len(SYMREF) :].strip())
869 main_wt_info.branch = ref_name
870 else:
871 main_wt_info.detached = True
872 main_wt_info.branch = None
873 except (FileNotFoundError, PermissionError):
874 main_wt_info.branch = None
875 main_wt_info.detached = True
877 worktrees.append(main_wt_info)
879 # List additional worktrees
880 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
881 if os.path.isdir(worktrees_dir):
882 for entry in os.listdir(worktrees_dir):
883 worktree_path = os.path.join(worktrees_dir, entry)
884 if not os.path.isdir(worktree_path):
885 continue
887 wt_info = WorkTreeInfo(
888 path="", # Will be set below
889 bare=False,
890 detached=False,
891 locked=False,
892 prunable=False,
893 )
895 # Read gitdir to get actual worktree path
896 gitdir_path = os.path.join(worktree_path, GITDIR)
897 try:
898 with open(gitdir_path, "rb") as f:
899 gitdir_contents = f.read().strip()
900 # Convert relative path to absolute if needed
901 wt_path = os.fsdecode(gitdir_contents)
902 if not os.path.isabs(wt_path):
903 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
904 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
905 except (FileNotFoundError, PermissionError):
906 # Worktree directory is missing, skip it
907 # TODO: Consider adding these as prunable worktrees with a placeholder path
908 continue
910 # Check if worktree path exists
911 if wt_info.path and not os.path.exists(wt_info.path):
912 wt_info.prunable = True
914 # Read HEAD
915 head_path = os.path.join(worktree_path, "HEAD")
916 try:
917 with open(head_path, "rb") as f:
918 head_contents = f.read().strip()
919 if head_contents.startswith(SYMREF):
920 ref_name = Ref(head_contents[len(SYMREF) :].strip())
921 wt_info.branch = ref_name
922 # Resolve ref to get commit sha
923 try:
924 wt_info.head = repo.refs[ref_name]
925 except KeyError:
926 wt_info.head = None
927 else:
928 wt_info.detached = True
929 wt_info.branch = None
930 wt_info.head = head_contents
931 except (FileNotFoundError, PermissionError):
932 wt_info.head = None
933 wt_info.branch = None
935 # Check if locked
936 lock_reason = read_worktree_lock_reason(worktree_path)
937 if lock_reason is not None:
938 wt_info.locked = True
939 wt_info.lock_reason = lock_reason
941 worktrees.append(wt_info)
943 return worktrees
946def add_worktree(
947 repo: Repo,
948 path: str | bytes | os.PathLike[str],
949 branch: str | bytes | None = None,
950 commit: ObjectID | None = None,
951 force: bool = False,
952 detach: bool = False,
953 exist_ok: bool = False,
954) -> Repo:
955 """Add a new worktree to the repository.
957 Args:
958 repo: The main repository
959 path: Path where the new worktree should be created
960 branch: Branch to checkout in the new worktree (creates if doesn't exist)
961 commit: Specific commit to checkout (results in detached HEAD)
962 force: Force creation even if branch is already checked out elsewhere
963 detach: Detach HEAD in the new worktree
964 exist_ok: If True, do not raise an error if the directory already exists
966 Returns:
967 The newly created worktree repository
969 Raises:
970 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
971 """
972 from .repo import Repo as RepoClass
974 path = os.fspath(path)
975 if isinstance(path, bytes):
976 path = os.fsdecode(path)
978 # Check if path already exists
979 if os.path.exists(path) and not exist_ok:
980 raise ValueError(f"Path already exists: {path}")
982 # Normalize branch name
983 if branch is not None:
984 if isinstance(branch, str):
985 branch = branch.encode()
986 branch = local_branch_name(branch)
988 # Check if branch is already checked out in another worktree
989 if branch and not force:
990 for wt in list_worktrees(repo):
991 if wt.branch == branch:
992 raise ValueError(
993 f"Branch {branch.decode()} is already checked out at {wt.path}"
994 )
996 # Determine what to checkout
997 if commit is not None:
998 checkout_ref = commit
999 detach = True
1000 elif branch is not None:
1001 # Check if branch exists
1002 try:
1003 checkout_ref = repo.refs[branch]
1004 except KeyError:
1005 if commit is None:
1006 # Create new branch from HEAD
1007 checkout_ref = repo.head()
1008 repo.refs[branch] = checkout_ref
1009 else:
1010 # Create new branch from specified commit
1011 checkout_ref = commit
1012 repo.refs[branch] = checkout_ref
1013 else:
1014 # Default to current HEAD
1015 checkout_ref = repo.head()
1016 detach = True
1018 # Create the worktree directory
1019 os.makedirs(path, exist_ok=exist_ok)
1021 # Initialize the worktree
1022 identifier = os.path.basename(path)
1023 wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
1025 # Set HEAD appropriately
1026 if detach:
1027 # Detached HEAD - write SHA directly to HEAD
1028 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
1029 f.write(checkout_ref + b"\n")
1030 else:
1031 # Point to branch
1032 assert branch is not None # Should be guaranteed by logic above
1033 from dulwich.refs import HEADREF
1035 wt_repo.refs.set_symbolic_ref(HEADREF, branch)
1037 # Reset index to match HEAD
1038 wt_repo.get_worktree().reset_index()
1040 return wt_repo
1043def remove_worktree(
1044 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False
1045) -> None:
1046 """Remove a worktree.
1048 Args:
1049 repo: The main repository
1050 path: Path to the worktree to remove
1051 force: Force removal even if there are local changes
1053 Raises:
1054 ValueError: If the worktree doesn't exist, has local changes, or is locked
1055 """
1056 path = os.fspath(path)
1057 if isinstance(path, bytes):
1058 path = os.fsdecode(path)
1060 # Don't allow removing the main worktree
1061 if os.path.abspath(path) == os.path.abspath(repo.path):
1062 raise ValueError("Cannot remove the main working tree")
1064 # Find the worktree
1065 worktree_found = False
1066 worktree_id = None
1067 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1069 if os.path.isdir(worktrees_dir):
1070 for entry in os.listdir(worktrees_dir):
1071 worktree_path = os.path.join(worktrees_dir, entry)
1072 gitdir_path = os.path.join(worktree_path, GITDIR)
1074 try:
1075 with open(gitdir_path, "rb") as f:
1076 gitdir_contents = f.read().strip()
1077 wt_path = os.fsdecode(gitdir_contents)
1078 if not os.path.isabs(wt_path):
1079 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1080 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1082 if os.path.abspath(wt_dir) == os.path.abspath(path):
1083 worktree_found = True
1084 worktree_id = entry
1085 break
1086 except (FileNotFoundError, PermissionError):
1087 continue
1089 if not worktree_found:
1090 raise ValueError(f"Worktree not found: {path}")
1092 assert worktree_id is not None # Should be set if worktree_found is True
1093 worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
1095 # Check if locked
1096 if os.path.exists(os.path.join(worktree_control_dir, "locked")):
1097 if not force:
1098 raise ValueError(f"Worktree is locked: {path}")
1100 # Check for local changes if not forcing
1101 if not force and os.path.exists(path):
1102 # TODO: Check for uncommitted changes in the worktree
1103 pass
1105 # Remove the working directory
1106 if os.path.exists(path):
1107 shutil.rmtree(path)
1109 # Remove the administrative files
1110 shutil.rmtree(worktree_control_dir)
1113def prune_worktrees(
1114 repo: Repo, expire: int | None = None, dry_run: bool = False
1115) -> list[str]:
1116 """Prune worktree administrative files for missing worktrees.
1118 Args:
1119 repo: The main repository
1120 expire: Only prune worktrees older than this many seconds
1121 dry_run: Don't actually remove anything, just report what would be removed
1123 Returns:
1124 List of pruned worktree identifiers
1125 """
1126 pruned: list[str] = []
1127 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1129 if not os.path.isdir(worktrees_dir):
1130 return pruned
1132 current_time = time.time()
1134 for entry in os.listdir(worktrees_dir):
1135 worktree_path = os.path.join(worktrees_dir, entry)
1136 if not os.path.isdir(worktree_path):
1137 continue
1139 # Skip locked worktrees
1140 if os.path.exists(os.path.join(worktree_path, "locked")):
1141 continue
1143 should_prune = False
1145 # Check if gitdir exists and points to valid location
1146 gitdir_path = os.path.join(worktree_path, GITDIR)
1147 try:
1148 with open(gitdir_path, "rb") as f:
1149 gitdir_contents = f.read().strip()
1150 wt_path = os.fsdecode(gitdir_contents)
1151 if not os.path.isabs(wt_path):
1152 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1153 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1155 if not os.path.exists(wt_dir):
1156 should_prune = True
1157 except (FileNotFoundError, PermissionError):
1158 should_prune = True
1160 # Check expiry time if specified
1161 if should_prune and expire is not None:
1162 stat_info = os.stat(worktree_path)
1163 age = current_time - stat_info.st_mtime
1164 if age < expire:
1165 should_prune = False
1167 if should_prune:
1168 pruned.append(entry)
1169 if not dry_run:
1170 shutil.rmtree(worktree_path)
1172 return pruned
1175def lock_worktree(
1176 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None
1177) -> None:
1178 """Lock a worktree to prevent it from being pruned.
1180 Args:
1181 repo: The main repository
1182 path: Path to the worktree to lock
1183 reason: Optional reason for locking
1184 """
1185 worktree_id = _find_worktree_id(repo, path)
1186 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1188 lock_path = os.path.join(worktree_control_dir, "locked")
1189 with open(lock_path, "w") as f:
1190 if reason:
1191 f.write(reason)
1194def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
1195 """Unlock a worktree.
1197 Args:
1198 repo: The main repository
1199 path: Path to the worktree to unlock
1200 """
1201 worktree_id = _find_worktree_id(repo, path)
1202 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1204 lock_path = os.path.join(worktree_control_dir, "locked")
1205 if os.path.exists(lock_path):
1206 os.remove(lock_path)
1209def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str:
1210 """Find the worktree identifier for the given path.
1212 Args:
1213 repo: The main repository
1214 path: Path to the worktree
1216 Returns:
1217 The worktree identifier
1219 Raises:
1220 ValueError: If the worktree is not found
1221 """
1222 path = os.fspath(path)
1223 if isinstance(path, bytes):
1224 path = os.fsdecode(path)
1226 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1228 if os.path.isdir(worktrees_dir):
1229 for entry in os.listdir(worktrees_dir):
1230 worktree_path = os.path.join(worktrees_dir, entry)
1231 gitdir_path = os.path.join(worktree_path, GITDIR)
1233 try:
1234 with open(gitdir_path, "rb") as f:
1235 gitdir_contents = f.read().strip()
1236 wt_path = os.fsdecode(gitdir_contents)
1237 if not os.path.isabs(wt_path):
1238 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1239 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1241 if os.path.abspath(wt_dir) == os.path.abspath(path):
1242 return entry
1243 except (FileNotFoundError, PermissionError):
1244 continue
1246 raise ValueError(f"Worktree not found: {path}")
1249def move_worktree(
1250 repo: Repo,
1251 old_path: str | bytes | os.PathLike[str],
1252 new_path: str | bytes | os.PathLike[str],
1253) -> None:
1254 """Move a worktree to a new location.
1256 Args:
1257 repo: The main repository
1258 old_path: Current path of the worktree
1259 new_path: New path for the worktree
1261 Raises:
1262 ValueError: If the worktree doesn't exist or new path already exists
1263 """
1264 old_path = os.fspath(old_path)
1265 new_path = os.fspath(new_path)
1266 if isinstance(old_path, bytes):
1267 old_path = os.fsdecode(old_path)
1268 if isinstance(new_path, bytes):
1269 new_path = os.fsdecode(new_path)
1271 # Don't allow moving the main worktree
1272 if os.path.abspath(old_path) == os.path.abspath(repo.path):
1273 raise ValueError("Cannot move the main working tree")
1275 # Check if new path already exists
1276 if os.path.exists(new_path):
1277 raise ValueError(f"Path already exists: {new_path}")
1279 # Find the worktree
1280 worktree_id = _find_worktree_id(repo, old_path)
1281 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1283 # Move the actual worktree directory
1284 shutil.move(old_path, new_path)
1286 # Update the gitdir file in the worktree
1287 gitdir_file = os.path.join(new_path, ".git")
1289 # Update the gitdir pointer in the control directory
1290 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
1291 f.write(os.fsencode(gitdir_file) + b"\n")
1294def repair_worktree(
1295 repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
1296) -> list[str]:
1297 """Repair worktree administrative files.
1299 This repairs the connection between worktrees and the main repository
1300 when they have been moved or become corrupted.
1302 Args:
1303 repo: The main repository
1304 paths: Optional list of worktree paths to repair. If None, repairs
1305 connections from the main repository to all linked worktrees.
1307 Returns:
1308 List of repaired worktree paths
1310 Raises:
1311 ValueError: If a specified path is not a valid worktree
1312 """
1313 repaired: list[str] = []
1314 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1316 if paths:
1317 # Repair specific worktrees
1318 for path in paths:
1319 path_str = os.fspath(path)
1320 if isinstance(path_str, bytes):
1321 path_str = os.fsdecode(path_str)
1322 path_str = os.path.abspath(path_str)
1324 # Check if this is a linked worktree
1325 gitdir_file = os.path.join(path_str, ".git")
1326 if not os.path.exists(gitdir_file):
1327 raise ValueError(f"Not a valid worktree: {path_str}")
1329 # Read the .git file to get the worktree control directory
1330 try:
1331 with open(gitdir_file, "rb") as f:
1332 gitdir_content = f.read().strip()
1333 if gitdir_content.startswith(b"gitdir: "):
1334 worktree_control_path = gitdir_content[8:].decode()
1335 else:
1336 raise ValueError(f"Invalid .git file in worktree: {path_str}")
1337 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
1338 raise ValueError(
1339 f"Cannot read .git file in worktree: {path_str}"
1340 ) from e
1342 # Make the path absolute if it's relative
1343 if not os.path.isabs(worktree_control_path):
1344 worktree_control_path = os.path.abspath(
1345 os.path.join(path_str, worktree_control_path)
1346 )
1348 # Update the gitdir file in the worktree control directory
1349 gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
1350 if os.path.exists(gitdir_pointer):
1351 # Update to point to the current location
1352 with open(gitdir_pointer, "wb") as f:
1353 f.write(os.fsencode(gitdir_file) + b"\n")
1354 repaired.append(path_str)
1355 else:
1356 # Repair from main repository to all linked worktrees
1357 if not os.path.isdir(worktrees_dir):
1358 return repaired
1360 for entry in os.listdir(worktrees_dir):
1361 worktree_control_path = os.path.join(worktrees_dir, entry)
1362 if not os.path.isdir(worktree_control_path):
1363 continue
1365 # Read the gitdir file to find where the worktree thinks it is
1366 gitdir_path = os.path.join(worktree_control_path, GITDIR)
1367 try:
1368 with open(gitdir_path, "rb") as f:
1369 gitdir_contents = f.read().strip()
1370 old_gitdir_location = os.fsdecode(gitdir_contents)
1371 except (FileNotFoundError, PermissionError):
1372 # Can't repair if we can't read the gitdir file
1373 continue
1375 # Get the worktree directory (remove .git suffix)
1376 old_worktree_path = os.path.dirname(old_gitdir_location)
1378 # Check if the .git file exists at the old location
1379 if os.path.exists(old_gitdir_location):
1380 # Try to read and update the .git file to ensure it points back correctly
1381 try:
1382 with open(old_gitdir_location, "rb") as f:
1383 content = f.read().strip()
1384 if content.startswith(b"gitdir: "):
1385 current_pointer = content[8:].decode()
1386 if not os.path.isabs(current_pointer):
1387 current_pointer = os.path.abspath(
1388 os.path.join(old_worktree_path, current_pointer)
1389 )
1391 # If it doesn't point to the right place, fix it
1392 expected_pointer = worktree_control_path
1393 if os.path.abspath(current_pointer) != os.path.abspath(
1394 expected_pointer
1395 ):
1396 # Update the .git file to point to the correct location
1397 with open(old_gitdir_location, "wb") as wf:
1398 wf.write(
1399 b"gitdir: "
1400 + os.fsencode(worktree_control_path)
1401 + b"\n"
1402 )
1403 repaired.append(old_worktree_path)
1404 except (PermissionError, UnicodeDecodeError):
1405 continue
1407 return repaired
1410@contextmanager
1411def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
1412 """Create a temporary worktree that is automatically cleaned up.
1414 Args:
1415 repo: Dulwich repository object
1416 prefix: Prefix for the temporary directory name
1418 Yields:
1419 Worktree object
1420 """
1421 temp_dir = None
1422 worktree = None
1424 try:
1425 # Create temporary directory
1426 temp_dir = tempfile.mkdtemp(prefix=prefix)
1428 # Add worktree
1429 worktree = repo.worktrees.add(temp_dir, exist_ok=True)
1431 yield worktree
1433 finally:
1434 # Clean up worktree registration
1435 if worktree:
1436 repo.worktrees.remove(worktree.path)
1438 # Clean up temporary directory
1439 if temp_dir and Path(temp_dir).exists():
1440 shutil.rmtree(temp_dir)