Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/worktree.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# worktree.py -- Working tree operations for Git repositories
2# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Working tree operations for Git repositories."""
24from __future__ import annotations
26__all__ = [
27 "WorkTree",
28 "WorkTreeContainer",
29 "WorkTreeInfo",
30 "add_worktree",
31 "list_worktrees",
32 "lock_worktree",
33 "move_worktree",
34 "prune_worktrees",
35 "read_worktree_lock_reason",
36 "remove_worktree",
37 "repair_worktree",
38 "temporary_worktree",
39 "unlock_worktree",
40]
42import builtins
43import os
44import shutil
45import stat
46import sys
47import tempfile
48import time
49import warnings
50from collections.abc import Callable, Iterable, Iterator, Sequence
51from contextlib import contextmanager
52from pathlib import Path
53from typing import TYPE_CHECKING, Any
55from .errors import CommitError, HookError
56from .objects import Blob, Commit, ObjectID, Tag, Tree
58if TYPE_CHECKING:
59 from .config import Config
60from .refs import SYMREF, Ref, local_branch_name
61from .repo import (
62 GITDIR,
63 WORKTREES,
64 Repo,
65 check_user_identity,
66 get_user_identity,
67)
68from .trailers import add_trailer_to_message
71def _should_use_relative_paths(
72 repo: Repo,
73 relative_paths: bool | None,
74 existing_path: bytes | None = None,
75) -> bool:
76 """Determine whether to use relative paths for gitdir references.
78 Args:
79 repo: The repository
80 relative_paths: Explicit preference (True/False) or None to check config
81 existing_path: Optional existing path to check format (for preserving format)
83 Returns:
84 True if relative paths should be used, False otherwise
85 """
86 if relative_paths is not None:
87 return relative_paths
89 # Check config
90 config = repo.get_config()
91 try:
92 use_relative = config.get_boolean(
93 (b"worktree",), b"useRelativePaths", default=False
94 )
95 if use_relative:
96 return True
97 except KeyError:
98 pass
100 # Preserve existing format if available
101 if existing_path is not None:
102 return not os.path.isabs(os.fsdecode(existing_path))
104 return False
107def _compute_gitdir_path(
108 repo: Repo,
109 gitdir_file: str,
110 worktree_control_dir: str,
111 use_relative: bool,
112) -> str:
113 """Compute the gitdir path and enable extension if needed.
115 Args:
116 repo: The repository
117 gitdir_file: Absolute path to the .git file
118 worktree_control_dir: Absolute path to the worktree control directory
119 use_relative: Whether to use relative paths
121 Returns:
122 The path to write (relative or absolute)
123 """
124 if use_relative:
125 from .repo import _enable_relative_worktrees_extension
127 _enable_relative_worktrees_extension(repo)
128 return os.path.relpath(gitdir_file, worktree_control_dir)
129 else:
130 return gitdir_file
133class WorkTreeInfo:
134 """Information about a single worktree.
136 Attributes:
137 path: Path to the worktree
138 head: Current HEAD commit SHA
139 branch: Current branch (if not detached)
140 bare: Whether this is a bare repository
141 detached: Whether HEAD is detached
142 locked: Whether the worktree is locked
143 prunable: Whether the worktree can be pruned
144 lock_reason: Reason for locking (if locked)
145 """
147 def __init__(
148 self,
149 path: str,
150 head: bytes | None = None,
151 branch: Ref | None = None,
152 bare: bool = False,
153 detached: bool = False,
154 locked: bool = False,
155 prunable: bool = False,
156 lock_reason: str | None = None,
157 ):
158 """Initialize WorkTreeInfo.
160 Args:
161 path: Path to the worktree
162 head: Current HEAD commit SHA
163 branch: Current branch (if not detached)
164 bare: Whether this is a bare repository
165 detached: Whether HEAD is detached
166 locked: Whether the worktree is locked
167 prunable: Whether the worktree can be pruned
168 lock_reason: Reason for locking (if locked)
169 """
170 self.path = path
171 self.head = head
172 self.branch = branch
173 self.bare = bare
174 self.detached = detached
175 self.locked = locked
176 self.prunable = prunable
177 self.lock_reason = lock_reason
179 def __repr__(self) -> str:
180 """Return string representation of WorkTreeInfo."""
181 return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
183 def __eq__(self, other: object) -> bool:
184 """Check equality with another WorkTreeInfo."""
185 if not isinstance(other, WorkTreeInfo):
186 return NotImplemented
187 return (
188 self.path == other.path
189 and self.head == other.head
190 and self.branch == other.branch
191 and self.bare == other.bare
192 and self.detached == other.detached
193 and self.locked == other.locked
194 and self.prunable == other.prunable
195 and self.lock_reason == other.lock_reason
196 )
198 def open(self) -> WorkTree:
199 """Open this worktree as a WorkTree.
201 Returns:
202 WorkTree object for this worktree
204 Raises:
205 NotGitRepository: If the worktree path is invalid
206 """
207 from .repo import Repo
209 repo = Repo(self.path)
210 return WorkTree(repo, self.path)
213class WorkTreeContainer:
214 """Container for managing multiple working trees.
216 This class manages worktrees for a repository, similar to how
217 RefsContainer manages references.
218 """
220 def __init__(self, repo: Repo) -> None:
221 """Initialize a WorkTreeContainer for the given repository.
223 Args:
224 repo: The repository this container belongs to
225 """
226 self._repo = repo
228 def list(self) -> list[WorkTreeInfo]:
229 """List all worktrees for this repository.
231 Returns:
232 A list of WorkTreeInfo objects
233 """
234 return list_worktrees(self._repo)
236 def add(
237 self,
238 path: str | bytes | os.PathLike[str],
239 branch: str | bytes | None = None,
240 commit: ObjectID | None = None,
241 force: bool = False,
242 detach: bool = False,
243 exist_ok: bool = False,
244 relative_paths: bool | None = None,
245 ) -> Repo:
246 """Add a new worktree.
248 Args:
249 path: Path where the new worktree should be created
250 branch: Branch to checkout in the new worktree
251 commit: Specific commit to checkout (results in detached HEAD)
252 force: Force creation even if branch is already checked out elsewhere
253 detach: Detach HEAD in the new worktree
254 exist_ok: If True, do not raise an error if the directory already exists
255 relative_paths: If True, use relative paths for gitdir references.
256 If None, check worktree.useRelativePaths config (defaults to False)
258 Returns:
259 The newly created worktree repository
260 """
261 return add_worktree(
262 self._repo,
263 path,
264 branch=branch,
265 commit=commit,
266 force=force,
267 detach=detach,
268 exist_ok=exist_ok,
269 relative_paths=relative_paths,
270 )
272 def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None:
273 """Remove a worktree.
275 Args:
276 path: Path to the worktree to remove
277 force: Force removal even if there are local changes
278 """
279 remove_worktree(self._repo, path, force=force)
281 def prune(
282 self, expire: int | None = None, dry_run: bool = False
283 ) -> builtins.list[str]:
284 """Prune worktree administrative files for missing worktrees.
286 Args:
287 expire: Only prune worktrees older than this many seconds
288 dry_run: Don't actually remove anything, just report what would be removed
290 Returns:
291 List of pruned worktree identifiers
292 """
293 return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
295 def move(
296 self,
297 old_path: str | bytes | os.PathLike[str],
298 new_path: str | bytes | os.PathLike[str],
299 relative_paths: bool | None = None,
300 ) -> None:
301 """Move a worktree to a new location.
303 Args:
304 old_path: Current path of the worktree
305 new_path: New path for the worktree
306 relative_paths: If True, use relative paths for gitdir references.
307 If None, check worktree.useRelativePaths config or preserve existing format
308 """
309 move_worktree(self._repo, old_path, new_path, relative_paths=relative_paths)
311 def lock(
312 self, path: str | bytes | os.PathLike[str], reason: str | None = None
313 ) -> None:
314 """Lock a worktree to prevent it from being pruned.
316 Args:
317 path: Path to the worktree to lock
318 reason: Optional reason for locking
319 """
320 lock_worktree(self._repo, path, reason=reason)
322 def unlock(self, path: str | bytes | os.PathLike[str]) -> None:
323 """Unlock a worktree.
325 Args:
326 path: Path to the worktree to unlock
327 """
328 unlock_worktree(self._repo, path)
330 def repair(
331 self,
332 paths: Sequence[str | bytes | os.PathLike[str]] | None = None,
333 relative_paths: bool | None = None,
334 ) -> builtins.list[str]:
335 """Repair worktree administrative files.
337 Args:
338 paths: Optional list of worktree paths to repair. If None, repairs
339 connections from the main repository to all linked worktrees.
340 relative_paths: If True, use relative paths for gitdir references.
341 If None, check worktree.useRelativePaths config or preserve existing format
343 Returns:
344 List of repaired worktree paths
345 """
346 return repair_worktree(self._repo, paths=paths, relative_paths=relative_paths)
348 def __iter__(self) -> Iterator[WorkTreeInfo]:
349 """Iterate over all worktrees."""
350 yield from self.list()
353class WorkTree:
354 """Working tree operations for a Git repository.
356 This class provides methods for working with the working tree,
357 such as staging files, committing changes, and resetting the index.
358 """
360 def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
361 """Initialize a WorkTree for the given repository.
363 Args:
364 repo: The repository this working tree belongs to
365 path: Path to the working tree directory
366 """
367 self._repo = repo
368 raw_path = os.fspath(path)
369 if isinstance(raw_path, bytes):
370 self.path: str = os.fsdecode(raw_path)
371 else:
372 self.path = raw_path
373 self.path = os.path.abspath(self.path)
375 def stage(
376 self,
377 fs_paths: str
378 | bytes
379 | os.PathLike[str]
380 | Iterable[str | bytes | os.PathLike[str]],
381 config: Config | None = None,
382 ) -> None:
383 """Stage a set of paths.
385 Args:
386 fs_paths: List of paths, relative to the repository path
387 config: Repository configuration. If None, falls back to
388 ``self._repo.get_config_stack()``.
389 """
390 if config is None:
391 config = self._repo.get_config_stack()
392 root_path_bytes = os.fsencode(self.path)
394 if isinstance(fs_paths, str | bytes | os.PathLike):
395 fs_paths = [fs_paths]
396 fs_paths = list(fs_paths)
398 from .index import (
399 _fs_to_tree_path,
400 blob_from_path_and_stat,
401 index_entry_from_directory,
402 index_entry_from_stat,
403 )
405 index = self._repo.open_index(config=config)
406 blob_normalizer = self._repo.get_blob_normalizer(config=config)
407 for fs_path in fs_paths:
408 if not isinstance(fs_path, bytes):
409 fs_path = os.fsencode(fs_path)
410 if os.path.isabs(fs_path):
411 raise ValueError(
412 f"path {fs_path!r} should be relative to "
413 "repository root, not absolute"
414 )
415 tree_path = _fs_to_tree_path(fs_path)
416 full_path = os.path.join(root_path_bytes, fs_path)
417 try:
418 st = os.lstat(full_path)
419 except (FileNotFoundError, NotADirectoryError):
420 # File no longer exists
421 try:
422 del index[tree_path]
423 except KeyError:
424 pass # already removed
425 else:
426 if stat.S_ISDIR(st.st_mode):
427 entry = index_entry_from_directory(st, full_path)
428 if entry:
429 index[tree_path] = entry
430 else:
431 try:
432 del index[tree_path]
433 except KeyError:
434 pass
435 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
436 try:
437 del index[tree_path]
438 except KeyError:
439 pass
440 else:
441 blob = blob_from_path_and_stat(full_path, st)
442 blob = blob_normalizer.checkin_normalize(blob, fs_path)
443 self._repo.object_store.add_object(blob)
444 index[tree_path] = index_entry_from_stat(st, blob.id)
445 index.write()
447 def unstage(
448 self,
449 fs_paths: Sequence[str],
450 config: Config | None = None,
451 ) -> None:
452 """Unstage specific file in the index.
454 Args:
455 fs_paths: a list of files to unstage,
456 relative to the repository path.
457 config: Repository configuration. If None, falls back to
458 ``self._repo.get_config_stack()``.
459 """
460 if config is None:
461 config = self._repo.get_config_stack()
462 from .index import IndexEntry, _fs_to_tree_path
464 index = self._repo.open_index(config=config)
465 try:
466 commit = self._repo[Ref(b"HEAD")]
467 except KeyError:
468 # no head mean no commit in the repo
469 for fs_path in fs_paths:
470 tree_path = _fs_to_tree_path(fs_path)
471 del index[tree_path]
472 index.write()
473 return
474 else:
475 assert isinstance(commit, Commit), "HEAD must be a commit"
476 tree_id = commit.tree
478 for fs_path in fs_paths:
479 tree_path = _fs_to_tree_path(fs_path)
480 try:
481 tree = self._repo.object_store[tree_id]
482 assert isinstance(tree, Tree)
483 tree_entry = tree.lookup_path(
484 self._repo.object_store.__getitem__, tree_path
485 )
486 except KeyError:
487 # if tree_entry didn't exist, this file was being added, so
488 # remove index entry
489 try:
490 del index[tree_path]
491 continue
492 except KeyError as exc:
493 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
495 st = None
496 try:
497 st = os.lstat(os.path.join(self.path, fs_path))
498 except FileNotFoundError:
499 pass
501 blob_obj = self._repo[tree_entry[1]]
502 assert isinstance(blob_obj, Blob)
503 blob_size = len(blob_obj.data)
505 index_entry = IndexEntry(
506 ctime=(commit.commit_time, 0),
507 mtime=(commit.commit_time, 0),
508 dev=st.st_dev if st else 0,
509 ino=st.st_ino if st else 0,
510 mode=tree_entry[0],
511 uid=st.st_uid if st else 0,
512 gid=st.st_gid if st else 0,
513 size=blob_size,
514 sha=tree_entry[1],
515 flags=0,
516 extended_flags=0,
517 )
519 index[tree_path] = index_entry
520 index.write()
522 def commit(
523 self,
524 message: str | bytes | Callable[[Any, Commit], bytes] | None = None,
525 committer: bytes | None = None,
526 author: bytes | None = None,
527 commit_timestamp: float | None = None,
528 commit_timezone: int | None = None,
529 author_timestamp: float | None = None,
530 author_timezone: int | None = None,
531 tree: ObjectID | None = None,
532 encoding: bytes | None = None,
533 ref: Ref | None = Ref(b"HEAD"),
534 merge_heads: Sequence[ObjectID] | None = None,
535 no_verify: bool = False,
536 sign: bool | None = None,
537 signoff: bool | None = None,
538 config: Config | None = None,
539 ) -> ObjectID:
540 """Create a new commit.
542 If not specified, committer and author default to
543 get_user_identity(..., 'COMMITTER')
544 and get_user_identity(..., 'AUTHOR') respectively.
546 Args:
547 message: Commit message (bytes or callable that takes (repo, commit)
548 and returns bytes)
549 committer: Committer fullname
550 author: Author fullname
551 commit_timestamp: Commit timestamp (defaults to now)
552 commit_timezone: Commit timestamp timezone (defaults to GMT)
553 author_timestamp: Author timestamp (defaults to commit
554 timestamp)
555 author_timezone: Author timestamp timezone
556 (defaults to commit timestamp timezone)
557 tree: SHA1 of the tree root to use (if not specified the
558 current index will be committed).
559 encoding: Encoding
560 ref: Optional ref to commit to (defaults to current branch).
561 If None, creates a dangling commit without updating any ref.
562 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
563 no_verify: Skip pre-commit and commit-msg hooks
564 sign: GPG Sign the commit (bool, defaults to False,
565 pass True to use default GPG key,
566 pass a str containing Key ID to use a specific GPG key)
567 signoff: Add Signed-off-by line (DCO) to commit message.
568 If None, uses format.signoff config.
569 config: Configuration to consult for committer/author identity and
570 other commit-time settings. If None, falls back to
571 ``self._repo.get_config_stack()``.
573 Returns:
574 New commit SHA1
575 """
576 try:
577 if not no_verify:
578 self._repo.hooks["pre-commit"].execute()
579 except HookError as exc:
580 raise CommitError(exc) from exc
581 except KeyError: # no hook defined, silent fallthrough
582 pass
584 if config is None:
585 config = self._repo.get_config_stack()
587 c = Commit()
588 if tree is None:
589 index = self._repo.open_index(config=config)
590 c.tree = index.commit(self._repo.object_store)
591 else:
592 if len(tree) != 40:
593 raise ValueError("tree must be a 40-byte hex sha string")
594 c.tree = tree
596 if merge_heads is None:
597 merge_heads = self._repo._read_heads("MERGE_HEAD")
598 if committer is None:
599 committer = get_user_identity(config, kind="COMMITTER")
600 check_user_identity(committer)
601 c.committer = committer
602 if commit_timestamp is None:
603 # FIXME: Support GIT_COMMITTER_DATE environment variable
604 commit_timestamp = time.time()
605 c.commit_time = int(commit_timestamp)
606 if commit_timezone is None:
607 # FIXME: Use current user timezone rather than UTC
608 commit_timezone = 0
609 c.commit_timezone = commit_timezone
610 if author is None:
611 author = get_user_identity(config, kind="AUTHOR")
612 c.author = author
613 check_user_identity(author)
614 if author_timestamp is None:
615 # FIXME: Support GIT_AUTHOR_DATE environment variable
616 author_timestamp = commit_timestamp
617 c.author_time = int(author_timestamp)
618 if author_timezone is None:
619 author_timezone = commit_timezone
620 c.author_timezone = author_timezone
621 if encoding is None:
622 try:
623 encoding = config.get(("i18n",), "commitEncoding")
624 except KeyError:
625 pass # No dice
626 if encoding is not None:
627 c.encoding = encoding
628 # Store original message (might be callable)
629 original_message = message
630 message = None # Will be set later after parents are set
632 # Check if we should sign the commit
633 if sign is None:
634 # Check commit.gpgSign configuration when sign is not explicitly set
635 try:
636 should_sign = config.get_boolean(
637 (b"commit",), b"gpgsign", default=False
638 )
639 except KeyError:
640 should_sign = False # Default to not signing if no config
641 else:
642 should_sign = sign
644 # Get the signing key from config if signing is enabled
645 keyid = None
646 if should_sign:
647 try:
648 keyid_bytes = config.get((b"user",), b"signingkey")
649 keyid = keyid_bytes.decode() if keyid_bytes else None
650 except KeyError:
651 keyid = None
653 if ref is None:
654 # Create a dangling commit
655 c.parents = list(merge_heads)
656 else:
657 try:
658 old_head = self._repo.refs[ref]
659 c.parents = [old_head, *merge_heads]
660 except KeyError:
661 c.parents = list(merge_heads)
663 # Handle message after parents are set
664 if callable(original_message):
665 message = original_message(self._repo, c)
666 if message is None:
667 raise ValueError("Message callback returned None")
668 else:
669 message = original_message
671 if message is None:
672 # FIXME: Try to read commit message from .git/MERGE_MSG
673 raise ValueError("No commit message specified")
675 # Handle signoff
676 should_signoff = signoff
677 if should_signoff is None:
678 # Check format.signOff configuration
679 try:
680 should_signoff = config.get_boolean(
681 (b"format",), b"signoff", default=False
682 )
683 except KeyError:
684 should_signoff = False
686 if should_signoff:
687 # Add Signed-off-by trailer
688 # Get the committer identity for the signoff
689 signoff_identity = committer
690 if isinstance(message, bytes):
691 message_bytes = message
692 else:
693 message_bytes = message.encode("utf-8")
695 message_bytes = add_trailer_to_message(
696 message_bytes,
697 "Signed-off-by",
698 signoff_identity.decode("utf-8")
699 if isinstance(signoff_identity, bytes)
700 else signoff_identity,
701 separator=":",
702 where="end",
703 if_exists="addIfDifferentNeighbor",
704 if_missing="add",
705 )
706 message = message_bytes
708 try:
709 if no_verify:
710 c.message = message
711 else:
712 c.message = self._repo.hooks["commit-msg"].execute(message)
713 if c.message is None:
714 c.message = message
715 except HookError as exc:
716 raise CommitError(exc) from exc
717 except KeyError: # no hook defined, message not modified
718 c.message = message
720 if ref is None:
721 # Create a dangling commit
722 if should_sign:
723 from dulwich.signature import get_signature_vendor
725 vendor = get_signature_vendor(config=config)
726 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid)
727 self._repo.object_store.add_object(c)
728 else:
729 try:
730 old_head = self._repo.refs[ref]
731 if should_sign:
732 from dulwich.signature import get_signature_vendor
734 vendor = get_signature_vendor(config=config)
735 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid)
736 self._repo.object_store.add_object(c)
737 message_bytes = (
738 message.encode() if isinstance(message, str) else message
739 )
740 ok = self._repo.refs.set_if_equals(
741 ref,
742 old_head,
743 c.id,
744 message=b"commit: " + message_bytes,
745 committer=committer,
746 timestamp=int(commit_timestamp)
747 if commit_timestamp is not None
748 else None,
749 timezone=commit_timezone,
750 )
751 except KeyError:
752 c.parents = list(merge_heads)
753 if should_sign:
754 from dulwich.signature import get_signature_vendor
756 vendor = get_signature_vendor(config=config)
757 c.gpgsig = vendor.sign(c.as_raw_string(), keyid=keyid)
758 self._repo.object_store.add_object(c)
759 message_bytes = (
760 message.encode() if isinstance(message, str) else message
761 )
762 ok = self._repo.refs.add_if_new(
763 ref,
764 c.id,
765 message=b"commit: " + message_bytes,
766 committer=committer,
767 timestamp=int(commit_timestamp)
768 if commit_timestamp is not None
769 else None,
770 timezone=commit_timezone,
771 )
772 if not ok:
773 # Fail if the atomic compare-and-swap failed, leaving the
774 # commit and all its objects as garbage.
775 raise CommitError(f"{ref!r} changed during commit")
777 self._repo._del_named_file("MERGE_HEAD")
779 try:
780 self._repo.hooks["post-commit"].execute()
781 except HookError as e: # silent failure
782 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
783 except KeyError: # no hook defined, silent fallthrough
784 pass
786 # Trigger auto GC if needed
787 from .gc import maybe_auto_gc
789 maybe_auto_gc(self._repo)
791 return c.id
793 def reset_index(
794 self,
795 tree: ObjectID | None = None,
796 config: Config | None = None,
797 ) -> None:
798 """Reset the index back to a specific tree.
800 Args:
801 tree: Tree SHA to reset to, None for current HEAD tree.
802 config: Stacked configuration used for filter setup. If None,
803 falls back to ``self._repo.get_config_stack()``.
804 """
805 if config is None:
806 config = self._repo.get_config_stack()
807 stacked_config = config
808 from .index import (
809 build_index_from_tree,
810 symlink,
811 validate_path_element_default,
812 validate_path_element_hfs,
813 validate_path_element_ntfs,
814 )
816 if tree is None:
817 head = self._repo[Ref(b"HEAD")]
818 if isinstance(head, Tag):
819 _cls, obj = head.object
820 head = self._repo.get_object(obj)
821 from .objects import Commit
823 assert isinstance(head, Commit)
824 tree = head.tree
825 config = self._repo.get_config()
826 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
827 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
828 validate_path_element = validate_path_element_ntfs
829 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
830 validate_path_element = validate_path_element_hfs
831 else:
832 validate_path_element = validate_path_element_default
833 if config.get_boolean(b"core", b"symlinks", True):
834 symlink_fn = symlink
835 else:
837 def symlink_fn( # type: ignore[misc,unused-ignore]
838 src: str | bytes,
839 dst: str | bytes,
840 target_is_directory: bool = False,
841 *,
842 dir_fd: int | None = None,
843 ) -> None:
844 with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
845 f.write(src)
847 blob_normalizer = self._repo.get_blob_normalizer(config=stacked_config)
848 return build_index_from_tree(
849 self.path,
850 self._repo.index_path(),
851 self._repo.object_store,
852 tree,
853 honor_filemode=honor_filemode,
854 validate_path_element=validate_path_element,
855 symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
856 blob_normalizer=blob_normalizer,
857 )
859 def _sparse_checkout_file_path(self) -> str:
860 """Return the path of the sparse-checkout file in this repo's control dir."""
861 return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
863 def configure_for_cone_mode(self) -> None:
864 """Ensure the repository is configured for cone-mode sparse-checkout."""
865 config = self._repo.get_config()
866 config.set((b"core",), b"sparseCheckout", b"true")
867 config.set((b"core",), b"sparseCheckoutCone", b"true")
868 config.write_to_path()
870 def infer_cone_mode(self) -> bool:
871 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
872 config = self._repo.get_config()
873 try:
874 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
875 return sc_cone == b"true"
876 except KeyError:
877 # If core.sparseCheckoutCone is not set, default to False
878 return False
880 def get_sparse_checkout_patterns(self) -> list[str]:
881 """Return a list of sparse-checkout patterns from info/sparse-checkout.
883 Returns:
884 A list of patterns. Returns an empty list if the file is missing.
885 """
886 path = self._sparse_checkout_file_path()
887 try:
888 with open(path, encoding="utf-8") as f:
889 return [line.strip() for line in f if line.strip()]
890 except FileNotFoundError:
891 return []
893 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
894 """Write the given sparse-checkout patterns into info/sparse-checkout.
896 Creates the info/ directory if it does not exist.
898 Args:
899 patterns: A list of gitignore-style patterns to store.
900 """
901 info_dir = os.path.join(self._repo.controldir(), "info")
902 os.makedirs(info_dir, exist_ok=True)
904 path = self._sparse_checkout_file_path()
905 with open(path, "w", encoding="utf-8") as f:
906 for pat in patterns:
907 f.write(pat + "\n")
909 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
910 """Write the given cone-mode directory patterns into info/sparse-checkout.
912 For each directory to include, add an inclusion line that "undoes" the prior
913 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
914 Never add the same line twice.
915 """
916 patterns = ["/*", "!/*/"]
917 if dirs:
918 for d in dirs:
919 d = d.strip("/")
920 line = f"/{d}/"
921 if d and line not in patterns:
922 patterns.append(line)
923 self.set_sparse_checkout_patterns(patterns)
926def read_worktree_lock_reason(worktree_path: str) -> str | None:
927 """Read the lock reason for a worktree.
929 Args:
930 worktree_path: Path to the worktree's administrative directory
932 Returns:
933 The lock reason if the worktree is locked, None otherwise
934 """
935 locked_path = os.path.join(worktree_path, "locked")
936 if not os.path.exists(locked_path):
937 return None
939 try:
940 with open(locked_path) as f:
941 return f.read().strip()
942 except (FileNotFoundError, PermissionError):
943 return None
946def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
947 """List all worktrees for the given repository.
949 Args:
950 repo: The repository to list worktrees for
952 Returns:
953 A list of WorkTreeInfo objects
954 """
955 worktrees = []
957 # Add main worktree
958 main_wt_info = WorkTreeInfo(
959 path=repo.path,
960 head=repo.head(),
961 bare=repo.bare,
962 detached=False,
963 locked=False,
964 prunable=False,
965 )
967 # Get branch info for main worktree
968 try:
969 with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
970 head_contents = f.read().strip()
971 if head_contents.startswith(SYMREF):
972 ref_name = Ref(head_contents[len(SYMREF) :].strip())
973 main_wt_info.branch = ref_name
974 else:
975 main_wt_info.detached = True
976 main_wt_info.branch = None
977 except (FileNotFoundError, PermissionError):
978 main_wt_info.branch = None
979 main_wt_info.detached = True
981 worktrees.append(main_wt_info)
983 # List additional worktrees
984 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
985 if os.path.isdir(worktrees_dir):
986 for entry in os.listdir(worktrees_dir):
987 worktree_path = os.path.join(worktrees_dir, entry)
988 if not os.path.isdir(worktree_path):
989 continue
991 wt_info = WorkTreeInfo(
992 path="", # Will be set below
993 bare=False,
994 detached=False,
995 locked=False,
996 prunable=False,
997 )
999 # Read gitdir to get actual worktree path
1000 gitdir_path = os.path.join(worktree_path, GITDIR)
1001 try:
1002 with open(gitdir_path, "rb") as f:
1003 gitdir_contents = f.read().strip()
1004 # Convert relative path to absolute if needed
1005 wt_path = os.fsdecode(gitdir_contents)
1006 if not os.path.isabs(wt_path):
1007 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1008 wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
1009 except (FileNotFoundError, PermissionError):
1010 # Worktree directory is missing, skip it
1011 # TODO: Consider adding these as prunable worktrees with a placeholder path
1012 continue
1014 # Check if worktree path exists
1015 if wt_info.path and not os.path.exists(wt_info.path):
1016 wt_info.prunable = True
1018 # Read HEAD
1019 head_path = os.path.join(worktree_path, "HEAD")
1020 try:
1021 with open(head_path, "rb") as f:
1022 head_contents = f.read().strip()
1023 if head_contents.startswith(SYMREF):
1024 ref_name = Ref(head_contents[len(SYMREF) :].strip())
1025 wt_info.branch = ref_name
1026 # Resolve ref to get commit sha
1027 try:
1028 wt_info.head = repo.refs[ref_name]
1029 except KeyError:
1030 wt_info.head = None
1031 else:
1032 wt_info.detached = True
1033 wt_info.branch = None
1034 wt_info.head = head_contents
1035 except (FileNotFoundError, PermissionError):
1036 wt_info.head = None
1037 wt_info.branch = None
1039 # Check if locked
1040 lock_reason = read_worktree_lock_reason(worktree_path)
1041 if lock_reason is not None:
1042 wt_info.locked = True
1043 wt_info.lock_reason = lock_reason
1045 worktrees.append(wt_info)
1047 return worktrees
1050def add_worktree(
1051 repo: Repo,
1052 path: str | bytes | os.PathLike[str],
1053 branch: str | bytes | None = None,
1054 commit: ObjectID | None = None,
1055 force: bool = False,
1056 detach: bool = False,
1057 exist_ok: bool = False,
1058 relative_paths: bool | None = None,
1059) -> Repo:
1060 """Add a new worktree to the repository.
1062 Args:
1063 repo: The main repository
1064 path: Path where the new worktree should be created
1065 branch: Branch to checkout in the new worktree (creates if doesn't exist)
1066 commit: Specific commit to checkout (results in detached HEAD)
1067 force: Force creation even if branch is already checked out elsewhere
1068 detach: Detach HEAD in the new worktree
1069 exist_ok: If True, do not raise an error if the directory already exists
1070 relative_paths: If True, use relative paths for gitdir references.
1071 If None, check worktree.useRelativePaths config (defaults to False)
1073 Returns:
1074 The newly created worktree repository
1076 Raises:
1077 ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
1078 """
1079 from .repo import Repo as RepoClass
1081 path = os.fspath(path)
1082 if isinstance(path, bytes):
1083 path = os.fsdecode(path)
1085 # Determine whether to use relative paths
1086 use_relative = _should_use_relative_paths(repo, relative_paths)
1088 # Check if path already exists
1089 if os.path.exists(path) and not exist_ok:
1090 raise ValueError(f"Path already exists: {path}")
1092 # Normalize branch name
1093 if branch is not None:
1094 if isinstance(branch, str):
1095 branch = branch.encode()
1096 branch = local_branch_name(branch)
1098 # Check if branch is already checked out in another worktree
1099 if branch and not force:
1100 for wt in list_worktrees(repo):
1101 if wt.branch == branch:
1102 raise ValueError(
1103 f"Branch {branch.decode()} is already checked out at {wt.path}"
1104 )
1106 # Determine what to checkout
1107 if commit is not None:
1108 checkout_ref = commit
1109 detach = True
1110 elif branch is not None:
1111 # Check if branch exists
1112 try:
1113 checkout_ref = repo.refs[branch]
1114 except KeyError:
1115 if commit is None:
1116 # Create new branch from HEAD
1117 checkout_ref = repo.head()
1118 repo.refs[branch] = checkout_ref
1119 else:
1120 # Create new branch from specified commit
1121 checkout_ref = commit
1122 repo.refs[branch] = checkout_ref
1123 else:
1124 # Default to current HEAD
1125 checkout_ref = repo.head()
1126 detach = True
1128 # Create the worktree directory
1129 os.makedirs(path, exist_ok=exist_ok)
1131 # Initialize the worktree
1132 identifier = os.path.basename(path)
1133 wt_repo = RepoClass._init_new_working_directory(
1134 path, repo, identifier=identifier, relative_paths=use_relative
1135 )
1137 # Set HEAD appropriately
1138 if detach:
1139 # Detached HEAD - write SHA directly to HEAD
1140 with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
1141 f.write(checkout_ref + b"\n")
1142 else:
1143 # Point to branch
1144 assert branch is not None # Should be guaranteed by logic above
1145 from dulwich.refs import HEADREF
1147 wt_repo.refs.set_symbolic_ref(HEADREF, branch)
1149 # Reset index to match HEAD
1150 wt_repo.get_worktree().reset_index(config=wt_repo.get_config_stack())
1152 return wt_repo
1155def remove_worktree(
1156 repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False
1157) -> None:
1158 """Remove a worktree.
1160 Args:
1161 repo: The main repository
1162 path: Path to the worktree to remove
1163 force: Force removal even if there are local changes
1165 Raises:
1166 ValueError: If the worktree doesn't exist, has local changes, or is locked
1167 """
1168 path = os.fspath(path)
1169 if isinstance(path, bytes):
1170 path = os.fsdecode(path)
1172 # Don't allow removing the main worktree
1173 if os.path.abspath(path) == os.path.abspath(repo.path):
1174 raise ValueError("Cannot remove the main working tree")
1176 # Find the worktree
1177 worktree_found = False
1178 worktree_id = None
1179 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1181 if os.path.isdir(worktrees_dir):
1182 for entry in os.listdir(worktrees_dir):
1183 worktree_path = os.path.join(worktrees_dir, entry)
1184 gitdir_path = os.path.join(worktree_path, GITDIR)
1186 try:
1187 with open(gitdir_path, "rb") as f:
1188 gitdir_contents = f.read().strip()
1189 wt_path = os.fsdecode(gitdir_contents)
1190 if not os.path.isabs(wt_path):
1191 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1192 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1194 if os.path.abspath(wt_dir) == os.path.abspath(path):
1195 worktree_found = True
1196 worktree_id = entry
1197 break
1198 except (FileNotFoundError, PermissionError):
1199 continue
1201 if not worktree_found:
1202 raise ValueError(f"Worktree not found: {path}")
1204 assert worktree_id is not None # Should be set if worktree_found is True
1205 worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
1207 # Check if locked
1208 if os.path.exists(os.path.join(worktree_control_dir, "locked")):
1209 if not force:
1210 raise ValueError(f"Worktree is locked: {path}")
1212 # Check for local changes if not forcing
1213 if not force and os.path.exists(path):
1214 # TODO: Check for uncommitted changes in the worktree
1215 pass
1217 # Remove the working directory
1218 if os.path.exists(path):
1219 shutil.rmtree(path)
1221 # Remove the administrative files
1222 shutil.rmtree(worktree_control_dir)
1225def prune_worktrees(
1226 repo: Repo, expire: int | None = None, dry_run: bool = False
1227) -> list[str]:
1228 """Prune worktree administrative files for missing worktrees.
1230 Args:
1231 repo: The main repository
1232 expire: Only prune worktrees older than this many seconds
1233 dry_run: Don't actually remove anything, just report what would be removed
1235 Returns:
1236 List of pruned worktree identifiers
1237 """
1238 pruned: list[str] = []
1239 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1241 if not os.path.isdir(worktrees_dir):
1242 return pruned
1244 current_time = time.time()
1246 for entry in os.listdir(worktrees_dir):
1247 worktree_path = os.path.join(worktrees_dir, entry)
1248 if not os.path.isdir(worktree_path):
1249 continue
1251 # Skip locked worktrees
1252 if os.path.exists(os.path.join(worktree_path, "locked")):
1253 continue
1255 should_prune = False
1257 # Check if gitdir exists and points to valid location
1258 gitdir_path = os.path.join(worktree_path, GITDIR)
1259 try:
1260 with open(gitdir_path, "rb") as f:
1261 gitdir_contents = f.read().strip()
1262 wt_path = os.fsdecode(gitdir_contents)
1263 if not os.path.isabs(wt_path):
1264 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1265 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1267 if not os.path.exists(wt_dir):
1268 should_prune = True
1269 except (FileNotFoundError, PermissionError):
1270 should_prune = True
1272 # Check expiry time if specified
1273 if should_prune and expire is not None:
1274 stat_info = os.stat(worktree_path)
1275 age = current_time - stat_info.st_mtime
1276 if age < expire:
1277 should_prune = False
1279 if should_prune:
1280 pruned.append(entry)
1281 if not dry_run:
1282 shutil.rmtree(worktree_path)
1284 return pruned
1287def lock_worktree(
1288 repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None
1289) -> None:
1290 """Lock a worktree to prevent it from being pruned.
1292 Args:
1293 repo: The main repository
1294 path: Path to the worktree to lock
1295 reason: Optional reason for locking
1296 """
1297 worktree_id = _find_worktree_id(repo, path)
1298 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1300 lock_path = os.path.join(worktree_control_dir, "locked")
1301 with open(lock_path, "w") as f:
1302 if reason:
1303 f.write(reason)
1306def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
1307 """Unlock a worktree.
1309 Args:
1310 repo: The main repository
1311 path: Path to the worktree to unlock
1312 """
1313 worktree_id = _find_worktree_id(repo, path)
1314 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1316 lock_path = os.path.join(worktree_control_dir, "locked")
1317 if os.path.exists(lock_path):
1318 os.remove(lock_path)
1321def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str:
1322 """Find the worktree identifier for the given path.
1324 Args:
1325 repo: The main repository
1326 path: Path to the worktree
1328 Returns:
1329 The worktree identifier
1331 Raises:
1332 ValueError: If the worktree is not found
1333 """
1334 path = os.fspath(path)
1335 if isinstance(path, bytes):
1336 path = os.fsdecode(path)
1338 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1340 if os.path.isdir(worktrees_dir):
1341 for entry in os.listdir(worktrees_dir):
1342 worktree_path = os.path.join(worktrees_dir, entry)
1343 gitdir_path = os.path.join(worktree_path, GITDIR)
1345 try:
1346 with open(gitdir_path, "rb") as f:
1347 gitdir_contents = f.read().strip()
1348 wt_path = os.fsdecode(gitdir_contents)
1349 if not os.path.isabs(wt_path):
1350 wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
1351 wt_dir = os.path.dirname(wt_path) # Remove .git suffix
1353 if os.path.abspath(wt_dir) == os.path.abspath(path):
1354 return entry
1355 except (FileNotFoundError, PermissionError):
1356 continue
1358 raise ValueError(f"Worktree not found: {path}")
1361def move_worktree(
1362 repo: Repo,
1363 old_path: str | bytes | os.PathLike[str],
1364 new_path: str | bytes | os.PathLike[str],
1365 relative_paths: bool | None = None,
1366) -> None:
1367 """Move a worktree to a new location.
1369 Args:
1370 repo: The main repository
1371 old_path: Current path of the worktree
1372 new_path: New path for the worktree
1373 relative_paths: If True, use relative paths for gitdir references.
1374 If None, check worktree.useRelativePaths config or preserve existing format
1376 Raises:
1377 ValueError: If the worktree doesn't exist or new path already exists
1378 """
1379 old_path = os.fspath(old_path)
1380 new_path = os.fspath(new_path)
1381 if isinstance(old_path, bytes):
1382 old_path = os.fsdecode(old_path)
1383 if isinstance(new_path, bytes):
1384 new_path = os.fsdecode(new_path)
1386 # Don't allow moving the main worktree
1387 if os.path.abspath(old_path) == os.path.abspath(repo.path):
1388 raise ValueError("Cannot move the main working tree")
1390 # Check if new path already exists
1391 if os.path.exists(new_path):
1392 raise ValueError(f"Path already exists: {new_path}")
1394 # Find the worktree
1395 worktree_id = _find_worktree_id(repo, old_path)
1396 worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
1398 # Read existing path to check format
1399 existing_path = None
1400 try:
1401 with open(os.path.join(worktree_control_dir, GITDIR), "rb") as f:
1402 existing_path = f.read().strip()
1403 except (FileNotFoundError, PermissionError):
1404 pass
1406 # Determine whether to use relative paths
1407 use_relative = _should_use_relative_paths(repo, relative_paths, existing_path)
1409 # Move the actual worktree directory
1410 shutil.move(old_path, new_path)
1412 # Update the gitdir file in the worktree
1413 gitdir_file_abs = os.path.abspath(os.path.join(new_path, ".git"))
1415 # Compute the path to write
1416 gitdir_path = _compute_gitdir_path(
1417 repo, gitdir_file_abs, worktree_control_dir, use_relative
1418 )
1420 # Update the gitdir pointer in the control directory
1421 with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
1422 f.write(os.fsencode(gitdir_path) + b"\n")
1425def repair_worktree(
1426 repo: Repo,
1427 paths: Sequence[str | bytes | os.PathLike[str]] | None = None,
1428 relative_paths: bool | None = None,
1429) -> list[str]:
1430 """Repair worktree administrative files.
1432 This repairs the connection between worktrees and the main repository
1433 when they have been moved or become corrupted.
1435 Args:
1436 repo: The main repository
1437 paths: Optional list of worktree paths to repair. If None, repairs
1438 connections from the main repository to all linked worktrees.
1439 relative_paths: If True, use relative paths for gitdir references.
1440 If None, check worktree.useRelativePaths config or preserve existing format
1442 Returns:
1443 List of repaired worktree paths
1445 Raises:
1446 ValueError: If a specified path is not a valid worktree
1447 """
1448 repaired: list[str] = []
1449 worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
1451 if paths:
1452 # Repair specific worktrees
1453 for path in paths:
1454 path_str = os.fspath(path)
1455 if isinstance(path_str, bytes):
1456 path_str = os.fsdecode(path_str)
1457 path_str = os.path.abspath(path_str)
1459 # Check if this is a linked worktree
1460 gitdir_file = os.path.join(path_str, ".git")
1461 if not os.path.exists(gitdir_file):
1462 raise ValueError(f"Not a valid worktree: {path_str}")
1464 # Read the .git file to get the worktree control directory
1465 try:
1466 with open(gitdir_file, "rb") as f:
1467 gitdir_content = f.read().strip()
1468 if gitdir_content.startswith(b"gitdir: "):
1469 worktree_control_path = gitdir_content[8:].decode()
1470 else:
1471 raise ValueError(f"Invalid .git file in worktree: {path_str}")
1472 except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
1473 raise ValueError(
1474 f"Cannot read .git file in worktree: {path_str}"
1475 ) from e
1477 # Make the path absolute if it's relative
1478 if not os.path.isabs(worktree_control_path):
1479 worktree_control_path = os.path.abspath(
1480 os.path.join(path_str, worktree_control_path)
1481 )
1483 # Update the gitdir file in the worktree control directory
1484 gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
1485 if os.path.exists(gitdir_pointer):
1486 # Read existing path to check format
1487 existing_path = None
1488 try:
1489 with open(gitdir_pointer, "rb") as f:
1490 existing_path = f.read().strip()
1491 except (FileNotFoundError, PermissionError):
1492 pass
1494 # Determine which format to use for this worktree
1495 use_relative = _should_use_relative_paths(
1496 repo, relative_paths, existing_path
1497 )
1499 # Compute the path to write
1500 gitdir_path_to_write = _compute_gitdir_path(
1501 repo, gitdir_file, worktree_control_path, use_relative
1502 )
1504 # Update to point to the current location
1505 with open(gitdir_pointer, "wb") as f:
1506 f.write(os.fsencode(gitdir_path_to_write) + b"\n")
1507 repaired.append(path_str)
1508 else:
1509 # Repair from main repository to all linked worktrees
1510 if not os.path.isdir(worktrees_dir):
1511 return repaired
1513 for entry in os.listdir(worktrees_dir):
1514 worktree_control_path = os.path.join(worktrees_dir, entry)
1515 if not os.path.isdir(worktree_control_path):
1516 continue
1518 # Read the gitdir file to find where the worktree thinks it is
1519 gitdir_path = os.path.join(worktree_control_path, GITDIR)
1520 try:
1521 with open(gitdir_path, "rb") as f:
1522 gitdir_contents = f.read().strip()
1523 old_gitdir_location = os.fsdecode(gitdir_contents)
1524 except (FileNotFoundError, PermissionError):
1525 # Can't repair if we can't read the gitdir file
1526 continue
1528 # Get the worktree directory (remove .git suffix)
1529 old_worktree_path = os.path.dirname(old_gitdir_location)
1531 # Check if the .git file exists at the old location
1532 if os.path.exists(old_gitdir_location):
1533 # Try to read and update the .git file to ensure it points back correctly
1534 try:
1535 with open(old_gitdir_location, "rb") as f:
1536 content = f.read().strip()
1537 if content.startswith(b"gitdir: "):
1538 current_pointer = content[8:].decode()
1539 if not os.path.isabs(current_pointer):
1540 current_pointer = os.path.abspath(
1541 os.path.join(old_worktree_path, current_pointer)
1542 )
1544 # If it doesn't point to the right place, fix it
1545 expected_pointer = worktree_control_path
1546 if os.path.abspath(current_pointer) != os.path.abspath(
1547 expected_pointer
1548 ):
1549 # Determine which format to use
1550 use_relative = _should_use_relative_paths(
1551 repo, relative_paths, gitdir_contents
1552 )
1554 # Compute the path to write (from worktree to control dir)
1555 pointer_to_write = _compute_gitdir_path(
1556 repo,
1557 worktree_control_path,
1558 old_worktree_path,
1559 use_relative,
1560 )
1562 # Update the .git file to point to the correct location
1563 with open(old_gitdir_location, "wb") as wf:
1564 wf.write(
1565 b"gitdir: "
1566 + os.fsencode(pointer_to_write)
1567 + b"\n"
1568 )
1569 repaired.append(old_worktree_path)
1570 except (PermissionError, UnicodeDecodeError):
1571 continue
1573 return repaired
1576@contextmanager
1577def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
1578 """Create a temporary worktree that is automatically cleaned up.
1580 Args:
1581 repo: Dulwich repository object
1582 prefix: Prefix for the temporary directory name
1584 Yields:
1585 Worktree object
1586 """
1587 temp_dir = None
1588 worktree = None
1590 try:
1591 # Create temporary directory
1592 temp_dir = tempfile.mkdtemp(prefix=prefix)
1594 # Add worktree
1595 worktree = repo.worktrees.add(temp_dir, exist_ok=True)
1597 yield worktree
1599 finally:
1600 # Clean up worktree registration
1601 if worktree:
1602 repo.worktrees.remove(worktree.path)
1604 # Clean up temporary directory
1605 if temp_dir and Path(temp_dir).exists():
1606 shutil.rmtree(temp_dir)