Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1074 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence 

38from io import BytesIO 

39from types import TracebackType 

40from typing import ( 

41 TYPE_CHECKING, 

42 Any, 

43 BinaryIO, 

44 TypeVar, 

45) 

46 

47if TYPE_CHECKING: 

48 # There are no circular imports here, but we try to defer imports as long 

49 # as possible to reduce start-up time for anything that doesn't need 

50 # these imports. 

51 from .attrs import GitAttributes 

52 from .config import ConditionMatcher, ConfigFile, StackedConfig 

53 from .diff_tree import RenameDetector 

54 from .filters import FilterBlobNormalizer, FilterContext 

55 from .index import Index 

56 from .notes import Notes 

57 from .object_store import BaseObjectStore, GraphWalker 

58 from .pack import UnpackedObject 

59 from .rebase import RebaseStateManager 

60 from .walk import Walker 

61 from .worktree import WorkTree 

62 

63from . import reflog, replace_me 

64from .errors import ( 

65 NoIndexPresent, 

66 NotBlobError, 

67 NotCommitError, 

68 NotGitRepository, 

69 NotTagError, 

70 NotTreeError, 

71 RefFormatError, 

72) 

73from .file import GitFile 

74from .hooks import ( 

75 CommitMsgShellHook, 

76 Hook, 

77 PostCommitShellHook, 

78 PostReceiveShellHook, 

79 PreCommitShellHook, 

80) 

81from .object_store import ( 

82 DiskObjectStore, 

83 MemoryObjectStore, 

84 MissingObjectFinder, 

85 ObjectStoreGraphWalker, 

86 PackBasedObjectStore, 

87 PackCapableObjectStore, 

88 find_shallow, 

89 peel_sha, 

90) 

91from .objects import ( 

92 Blob, 

93 Commit, 

94 ObjectID, 

95 ShaFile, 

96 Tag, 

97 Tree, 

98 check_hexsha, 

99 valid_hexsha, 

100) 

101from .pack import generate_unpacked_objects 

102from .refs import ( 

103 ANNOTATED_TAG_SUFFIX, # noqa: F401 

104 LOCAL_TAG_PREFIX, # noqa: F401 

105 SYMREF, # noqa: F401 

106 DictRefsContainer, 

107 DiskRefsContainer, 

108 InfoRefsContainer, # noqa: F401 

109 Ref, 

110 RefsContainer, 

111 _set_default_branch, 

112 _set_head, 

113 _set_origin_head, 

114 check_ref_format, # noqa: F401 

115 extract_branch_name, 

116 is_per_worktree_ref, 

117 local_branch_name, 

118 read_packed_refs, # noqa: F401 

119 read_packed_refs_with_peeled, # noqa: F401 

120 serialize_refs, 

121 write_packed_refs, # noqa: F401 

122) 

123 

124CONTROLDIR = ".git" 

125OBJECTDIR = "objects" 

126DEFAULT_OFS_DELTA = True 

127 

128T = TypeVar("T", bound="ShaFile") 

129REFSDIR = "refs" 

130REFSDIR_TAGS = "tags" 

131REFSDIR_HEADS = "heads" 

132INDEX_FILENAME = "index" 

133COMMONDIR = "commondir" 

134GITDIR = "gitdir" 

135WORKTREES = "worktrees" 

136 

137BASE_DIRECTORIES = [ 

138 ["branches"], 

139 [REFSDIR], 

140 [REFSDIR, REFSDIR_TAGS], 

141 [REFSDIR, REFSDIR_HEADS], 

142 ["hooks"], 

143 ["info"], 

144] 

145 

146DEFAULT_BRANCH = b"master" 

147 

148 

149class InvalidUserIdentity(Exception): 

150 """User identity is not of the format 'user <email>'.""" 

151 

152 def __init__(self, identity: str) -> None: 

153 """Initialize InvalidUserIdentity exception.""" 

154 self.identity = identity 

155 

156 

157class DefaultIdentityNotFound(Exception): 

158 """Default identity could not be determined.""" 

159 

160 

161# TODO(jelmer): Cache? 

162def _get_default_identity() -> tuple[str, str]: 

163 import socket 

164 

165 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

166 username = os.environ.get(name) 

167 if username: 

168 break 

169 else: 

170 username = None 

171 

172 try: 

173 import pwd 

174 except ImportError: 

175 fullname = None 

176 else: 

177 try: 

178 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

179 except KeyError: 

180 fullname = None 

181 else: 

182 if getattr(entry, "gecos", None): 

183 fullname = entry.pw_gecos.split(",")[0] 

184 else: 

185 fullname = None 

186 if username is None: 

187 username = entry.pw_name 

188 if not fullname: 

189 if username is None: 

190 raise DefaultIdentityNotFound("no username found") 

191 fullname = username 

192 email = os.environ.get("EMAIL") 

193 if email is None: 

194 if username is None: 

195 raise DefaultIdentityNotFound("no username found") 

196 email = f"{username}@{socket.gethostname()}" 

197 return (fullname, email) 

198 

199 

200def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes: 

201 """Determine the identity to use for new commits. 

202 

203 If kind is set, this first checks 

204 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

205 

206 If those variables are not set, then it will fall back 

207 to reading the user.name and user.email settings from 

208 the specified configuration. 

209 

210 If that also fails, then it will fall back to using 

211 the current users' identity as obtained from the host 

212 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

213 

214 Args: 

215 config: Configuration stack to read from 

216 kind: Optional kind to return identity for, 

217 usually either "AUTHOR" or "COMMITTER". 

218 

219 Returns: 

220 A user identity 

221 """ 

222 user: bytes | None = None 

223 email: bytes | None = None 

224 if kind: 

225 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

226 if user_uc is not None: 

227 user = user_uc.encode("utf-8") 

228 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

229 if email_uc is not None: 

230 email = email_uc.encode("utf-8") 

231 if user is None: 

232 try: 

233 user = config.get(("user",), "name") 

234 except KeyError: 

235 user = None 

236 if email is None: 

237 try: 

238 email = config.get(("user",), "email") 

239 except KeyError: 

240 email = None 

241 default_user, default_email = _get_default_identity() 

242 if user is None: 

243 user = default_user.encode("utf-8") 

244 if email is None: 

245 email = default_email.encode("utf-8") 

246 if email.startswith(b"<") and email.endswith(b">"): 

247 email = email[1:-1] 

248 return user + b" <" + email + b">" 

249 

250 

251def check_user_identity(identity: bytes) -> None: 

252 """Verify that a user identity is formatted correctly. 

253 

254 Args: 

255 identity: User identity bytestring 

256 Raises: 

257 InvalidUserIdentity: Raised when identity is invalid 

258 """ 

259 try: 

260 _fst, snd = identity.split(b" <", 1) 

261 except ValueError as exc: 

262 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

263 if b">" not in snd: 

264 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

265 if b"\0" in identity or b"\n" in identity: 

266 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

267 

268 

269def parse_graftpoints( 

270 graftpoints: Iterable[bytes], 

271) -> dict[bytes, list[bytes]]: 

272 """Convert a list of graftpoints into a dict. 

273 

274 Args: 

275 graftpoints: Iterator of graftpoint lines 

276 

277 Each line is formatted as: 

278 <commit sha1> <parent sha1> [<parent sha1>]* 

279 

280 Resulting dictionary is: 

281 <commit sha1>: [<parent sha1>*] 

282 

283 https://git.wiki.kernel.org/index.php/GraftPoint 

284 """ 

285 grafts = {} 

286 for line in graftpoints: 

287 raw_graft = line.split(None, 1) 

288 

289 commit = raw_graft[0] 

290 if len(raw_graft) == 2: 

291 parents = raw_graft[1].split() 

292 else: 

293 parents = [] 

294 

295 for sha in [commit, *parents]: 

296 check_hexsha(sha, "Invalid graftpoint") 

297 

298 grafts[commit] = parents 

299 return grafts 

300 

301 

302def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes: 

303 """Convert a dictionary of grafts into string. 

304 

305 The graft dictionary is: 

306 <commit sha1>: [<parent sha1>*] 

307 

308 Each line is formatted as: 

309 <commit sha1> <parent sha1> [<parent sha1>]* 

310 

311 https://git.wiki.kernel.org/index.php/GraftPoint 

312 

313 """ 

314 graft_lines = [] 

315 for commit, parents in graftpoints.items(): 

316 if parents: 

317 graft_lines.append(commit + b" " + b" ".join(parents)) 

318 else: 

319 graft_lines.append(commit) 

320 return b"\n".join(graft_lines) 

321 

322 

323def _set_filesystem_hidden(path: str) -> None: 

324 """Mark path as to be hidden if supported by platform and filesystem. 

325 

326 On win32 uses SetFileAttributesW api: 

327 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

328 """ 

329 if sys.platform == "win32": 

330 import ctypes 

331 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

332 

333 FILE_ATTRIBUTE_HIDDEN = 2 

334 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

335 ("SetFileAttributesW", ctypes.windll.kernel32) 

336 ) 

337 

338 if isinstance(path, bytes): 

339 path = os.fsdecode(path) 

340 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

341 pass # Could raise or log `ctypes.WinError()` here 

342 

343 # Could implement other platform specific filesystem hiding here 

344 

345 

346def parse_shared_repository( 

347 value: str | bytes | bool, 

348) -> tuple[int | None, int | None]: 

349 """Parse core.sharedRepository configuration value. 

350 

351 Args: 

352 value: Configuration value (string, bytes, or boolean) 

353 

354 Returns: 

355 tuple of (file_mask, directory_mask) or (None, None) if not shared 

356 

357 The masks are permission bits to apply via chmod. 

358 """ 

359 if isinstance(value, bytes): 

360 value = value.decode("utf-8", errors="replace") 

361 

362 # Handle boolean values 

363 if isinstance(value, bool): 

364 if value: 

365 # true = group (same as "group") 

366 return (0o664, 0o2775) 

367 else: 

368 # false = umask (use system umask, no adjustment) 

369 return (None, None) 

370 

371 # Handle string values 

372 value_lower = value.lower() 

373 

374 if value_lower in ("false", "0", ""): 

375 # Use umask (no adjustment) 

376 return (None, None) 

377 

378 if value_lower in ("true", "1", "group"): 

379 # Group writable (with setgid bit) 

380 return (0o664, 0o2775) 

381 

382 if value_lower in ("all", "world", "everybody", "2"): 

383 # World readable/writable (with setgid bit) 

384 return (0o666, 0o2777) 

385 

386 if value_lower == "umask": 

387 # Explicitly use umask 

388 return (None, None) 

389 

390 # Try to parse as octal 

391 if value.startswith("0"): 

392 try: 

393 mode = int(value, 8) 

394 # For directories, add execute bits where read bits are set 

395 # and add setgid bit for shared repositories 

396 dir_mode = mode | 0o2000 # Add setgid bit 

397 if mode & 0o004: 

398 dir_mode |= 0o001 

399 if mode & 0o040: 

400 dir_mode |= 0o010 

401 if mode & 0o400: 

402 dir_mode |= 0o100 

403 return (mode, dir_mode) 

404 except ValueError: 

405 pass 

406 

407 # Default to umask for unrecognized values 

408 return (None, None) 

409 

410 

411class ParentsProvider: 

412 """Provider for commit parent information.""" 

413 

414 def __init__( 

415 self, 

416 store: "BaseObjectStore", 

417 grafts: dict[bytes, list[bytes]] = {}, 

418 shallows: Iterable[bytes] = [], 

419 ) -> None: 

420 """Initialize ParentsProvider. 

421 

422 Args: 

423 store: Object store to use 

424 grafts: Graft information 

425 shallows: Shallow commit SHAs 

426 """ 

427 self.store = store 

428 self.grafts = grafts 

429 self.shallows = set(shallows) 

430 

431 # Get commit graph once at initialization for performance 

432 self.commit_graph = store.get_commit_graph() 

433 

434 def get_parents( 

435 self, commit_id: bytes, commit: Commit | None = None 

436 ) -> list[bytes]: 

437 """Get parents for a commit using the parents provider.""" 

438 try: 

439 return self.grafts[commit_id] 

440 except KeyError: 

441 pass 

442 if commit_id in self.shallows: 

443 return [] 

444 

445 # Try to use commit graph for faster parent lookup 

446 if self.commit_graph: 

447 parents = self.commit_graph.get_parents(commit_id) 

448 if parents is not None: 

449 return parents 

450 

451 # Fallback to reading the commit object 

452 if commit is None: 

453 obj = self.store[commit_id] 

454 assert isinstance(obj, Commit) 

455 commit = obj 

456 parents = commit.parents 

457 assert isinstance(parents, list) 

458 return parents 

459 

460 

461class BaseRepo: 

462 """Base class for a git repository. 

463 

464 This base class is meant to be used for Repository implementations that e.g. 

465 work on top of a different transport than a standard filesystem path. 

466 

467 Attributes: 

468 object_store: Dictionary-like object for accessing 

469 the objects 

470 refs: Dictionary-like object with the refs in this 

471 repository 

472 """ 

473 

474 def __init__( 

475 self, object_store: "PackCapableObjectStore", refs: RefsContainer 

476 ) -> None: 

477 """Open a repository. 

478 

479 This shouldn't be called directly, but rather through one of the 

480 base classes, such as MemoryRepo or Repo. 

481 

482 Args: 

483 object_store: Object store to use 

484 refs: Refs container to use 

485 """ 

486 self.object_store = object_store 

487 self.refs = refs 

488 

489 self._graftpoints: dict[bytes, list[bytes]] = {} 

490 self.hooks: dict[str, Hook] = {} 

491 

492 def _determine_file_mode(self) -> bool: 

493 """Probe the file-system to determine whether permissions can be trusted. 

494 

495 Returns: True if permissions can be trusted, False otherwise. 

496 """ 

497 raise NotImplementedError(self._determine_file_mode) 

498 

499 def _determine_symlinks(self) -> bool: 

500 """Probe the filesystem to determine whether symlinks can be created. 

501 

502 Returns: True if symlinks can be created, False otherwise. 

503 """ 

504 # For now, just mimic the old behaviour 

505 return sys.platform != "win32" 

506 

507 def _init_files( 

508 self, 

509 bare: bool, 

510 symlinks: bool | None = None, 

511 format: int | None = None, 

512 shared_repository: str | bool | None = None, 

513 ) -> None: 

514 """Initialize a default set of named files.""" 

515 from .config import ConfigFile 

516 

517 self._put_named_file("description", b"Unnamed repository") 

518 f = BytesIO() 

519 cf = ConfigFile() 

520 if format is None: 

521 format = 0 

522 if format not in (0, 1): 

523 raise ValueError(f"Unsupported repository format version: {format}") 

524 cf.set("core", "repositoryformatversion", str(format)) 

525 if self._determine_file_mode(): 

526 cf.set("core", "filemode", True) 

527 else: 

528 cf.set("core", "filemode", False) 

529 

530 if symlinks is None and not bare: 

531 symlinks = self._determine_symlinks() 

532 

533 if symlinks is False: 

534 cf.set("core", "symlinks", symlinks) 

535 

536 cf.set("core", "bare", bare) 

537 cf.set("core", "logallrefupdates", True) 

538 

539 # Set shared repository if specified 

540 if shared_repository is not None: 

541 if isinstance(shared_repository, bool): 

542 cf.set("core", "sharedRepository", shared_repository) 

543 else: 

544 cf.set("core", "sharedRepository", shared_repository) 

545 

546 cf.write_to_file(f) 

547 self._put_named_file("config", f.getvalue()) 

548 self._put_named_file(os.path.join("info", "exclude"), b"") 

549 

550 def get_named_file(self, path: str) -> BinaryIO | None: 

551 """Get a file from the control dir with a specific name. 

552 

553 Although the filename should be interpreted as a filename relative to 

554 the control dir in a disk-based Repo, the object returned need not be 

555 pointing to a file in that location. 

556 

557 Args: 

558 path: The path to the file, relative to the control dir. 

559 Returns: An open file object, or None if the file does not exist. 

560 """ 

561 raise NotImplementedError(self.get_named_file) 

562 

563 def _put_named_file(self, path: str, contents: bytes) -> None: 

564 """Write a file to the control dir with the given name and contents. 

565 

566 Args: 

567 path: The path to the file, relative to the control dir. 

568 contents: A string to write to the file. 

569 """ 

570 raise NotImplementedError(self._put_named_file) 

571 

572 def _del_named_file(self, path: str) -> None: 

573 """Delete a file in the control directory with the given name.""" 

574 raise NotImplementedError(self._del_named_file) 

575 

576 def open_index(self) -> "Index": 

577 """Open the index for this repository. 

578 

579 Raises: 

580 NoIndexPresent: If no index is present 

581 Returns: The matching `Index` 

582 """ 

583 raise NotImplementedError(self.open_index) 

584 

585 def fetch( 

586 self, 

587 target: "BaseRepo", 

588 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]] 

589 | None = None, 

590 progress: Callable[..., None] | None = None, 

591 depth: int | None = None, 

592 ) -> dict[bytes, bytes]: 

593 """Fetch objects into another repository. 

594 

595 Args: 

596 target: The target repository 

597 determine_wants: Optional function to determine what refs to 

598 fetch. 

599 progress: Optional progress function 

600 depth: Optional shallow fetch depth 

601 Returns: The local refs 

602 """ 

603 if determine_wants is None: 

604 determine_wants = target.object_store.determine_wants_all 

605 count, pack_data = self.fetch_pack_data( 

606 determine_wants, 

607 target.get_graph_walker(), 

608 progress=progress, 

609 depth=depth, 

610 ) 

611 target.object_store.add_pack_data(count, pack_data, progress) 

612 return self.get_refs() 

613 

614 def fetch_pack_data( 

615 self, 

616 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]], 

617 graph_walker: "GraphWalker", 

618 progress: Callable[[bytes], None] | None, 

619 *, 

620 get_tagged: Callable[[], dict[bytes, bytes]] | None = None, 

621 depth: int | None = None, 

622 ) -> tuple[int, Iterator["UnpackedObject"]]: 

623 """Fetch the pack data required for a set of revisions. 

624 

625 Args: 

626 determine_wants: Function that takes a dictionary with heads 

627 and returns the list of heads to fetch. 

628 graph_walker: Object that can iterate over the list of revisions 

629 to fetch and has an "ack" method that will be called to acknowledge 

630 that a revision is present. 

631 progress: Simple progress function that will be called with 

632 updated progress strings. 

633 get_tagged: Function that returns a dict of pointed-to sha -> 

634 tag sha for including tags. 

635 depth: Shallow fetch depth 

636 Returns: count and iterator over pack data 

637 """ 

638 missing_objects = self.find_missing_objects( 

639 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

640 ) 

641 if missing_objects is None: 

642 return 0, iter([]) 

643 remote_has = missing_objects.get_remote_has() 

644 object_ids = list(missing_objects) 

645 return len(object_ids), generate_unpacked_objects( 

646 self.object_store, object_ids, progress=progress, other_haves=remote_has 

647 ) 

648 

649 def find_missing_objects( 

650 self, 

651 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]], 

652 graph_walker: "GraphWalker", 

653 progress: Callable[[bytes], None] | None, 

654 *, 

655 get_tagged: Callable[[], dict[bytes, bytes]] | None = None, 

656 depth: int | None = None, 

657 ) -> MissingObjectFinder | None: 

658 """Fetch the missing objects required for a set of revisions. 

659 

660 Args: 

661 determine_wants: Function that takes a dictionary with heads 

662 and returns the list of heads to fetch. 

663 graph_walker: Object that can iterate over the list of revisions 

664 to fetch and has an "ack" method that will be called to acknowledge 

665 that a revision is present. 

666 progress: Simple progress function that will be called with 

667 updated progress strings. 

668 get_tagged: Function that returns a dict of pointed-to sha -> 

669 tag sha for including tags. 

670 depth: Shallow fetch depth 

671 Returns: iterator over objects, with __len__ implemented 

672 """ 

673 refs = serialize_refs(self.object_store, self.get_refs()) 

674 

675 wants = determine_wants(refs, depth) 

676 if not isinstance(wants, list): 

677 raise TypeError("determine_wants() did not return a list") 

678 

679 current_shallow = set(getattr(graph_walker, "shallow", set())) 

680 

681 if depth not in (None, 0): 

682 assert depth is not None 

683 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

684 # Only update if graph_walker has shallow attribute 

685 if hasattr(graph_walker, "shallow"): 

686 graph_walker.shallow.update(shallow - not_shallow) 

687 new_shallow = graph_walker.shallow - current_shallow 

688 unshallow = not_shallow & current_shallow 

689 setattr(graph_walker, "unshallow", unshallow) 

690 if hasattr(graph_walker, "update_shallow"): 

691 graph_walker.update_shallow(new_shallow, unshallow) 

692 else: 

693 unshallow = getattr(graph_walker, "unshallow", set()) 

694 

695 if wants == []: 

696 # TODO(dborowitz): find a way to short-circuit that doesn't change 

697 # this interface. 

698 

699 if getattr(graph_walker, "shallow", set()) or unshallow: 

700 # Do not send a pack in shallow short-circuit path 

701 return None 

702 

703 # Return an actual MissingObjectFinder with empty wants 

704 return MissingObjectFinder( 

705 self.object_store, 

706 haves=[], 

707 wants=[], 

708 ) 

709 

710 # If the graph walker is set up with an implementation that can 

711 # ACK/NAK to the wire, it will write data to the client through 

712 # this call as a side-effect. 

713 haves = self.object_store.find_common_revisions(graph_walker) 

714 

715 # Deal with shallow requests separately because the haves do 

716 # not reflect what objects are missing 

717 if getattr(graph_walker, "shallow", set()) or unshallow: 

718 # TODO: filter the haves commits from iter_shas. the specific 

719 # commits aren't missing. 

720 haves = [] 

721 

722 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

723 

724 def get_parents(commit: Commit) -> list[bytes]: 

725 """Get parents for a commit using the parents provider. 

726 

727 Args: 

728 commit: Commit object 

729 

730 Returns: 

731 List of parent commit SHAs 

732 """ 

733 return parents_provider.get_parents(commit.id, commit) 

734 

735 return MissingObjectFinder( 

736 self.object_store, 

737 haves=haves, 

738 wants=wants, 

739 shallow=getattr(graph_walker, "shallow", set()), 

740 progress=progress, 

741 get_tagged=get_tagged, 

742 get_parents=get_parents, 

743 ) 

744 

745 def generate_pack_data( 

746 self, 

747 have: set[ObjectID], 

748 want: set[ObjectID], 

749 *, 

750 shallow: set[ObjectID] | None = None, 

751 progress: Callable[[str], None] | None = None, 

752 ofs_delta: bool | None = None, 

753 ) -> tuple[int, Iterator["UnpackedObject"]]: 

754 """Generate pack data objects for a set of wants/haves. 

755 

756 Args: 

757 have: List of SHA1s of objects that should not be sent 

758 want: List of SHA1s of objects that should be sent 

759 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits) 

760 ofs_delta: Whether OFS deltas can be included 

761 progress: Optional progress reporting method 

762 """ 

763 if shallow is None: 

764 shallow = self.get_shallow() 

765 return self.object_store.generate_pack_data( 

766 have, 

767 want, 

768 shallow=shallow, 

769 progress=progress, 

770 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

771 ) 

772 

773 def get_graph_walker( 

774 self, heads: list[ObjectID] | None = None 

775 ) -> ObjectStoreGraphWalker: 

776 """Retrieve a graph walker. 

777 

778 A graph walker is used by a remote repository (or proxy) 

779 to find out which objects are present in this repository. 

780 

781 Args: 

782 heads: Repository heads to use (optional) 

783 Returns: A graph walker object 

784 """ 

785 if heads is None: 

786 heads = [ 

787 sha 

788 for sha in self.refs.as_dict(b"refs/heads").values() 

789 if sha in self.object_store 

790 ] 

791 parents_provider = ParentsProvider(self.object_store) 

792 return ObjectStoreGraphWalker( 

793 heads, 

794 parents_provider.get_parents, 

795 shallow=self.get_shallow(), 

796 update_shallow=self.update_shallow, 

797 ) 

798 

799 def get_refs(self) -> dict[bytes, bytes]: 

800 """Get dictionary with all refs. 

801 

802 Returns: A ``dict`` mapping ref names to SHA1s 

803 """ 

804 return self.refs.as_dict() 

805 

806 def head(self) -> bytes: 

807 """Return the SHA1 pointed at by HEAD.""" 

808 # TODO: move this method to WorkTree 

809 return self.refs[b"HEAD"] 

810 

811 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

812 assert len(sha) in (20, 40) 

813 ret = self.get_object(sha) 

814 if not isinstance(ret, cls): 

815 if cls is Commit: 

816 raise NotCommitError(ret.id) 

817 elif cls is Blob: 

818 raise NotBlobError(ret.id) 

819 elif cls is Tree: 

820 raise NotTreeError(ret.id) 

821 elif cls is Tag: 

822 raise NotTagError(ret.id) 

823 else: 

824 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

825 return ret 

826 

827 def get_object(self, sha: bytes) -> ShaFile: 

828 """Retrieve the object with the specified SHA. 

829 

830 Args: 

831 sha: SHA to retrieve 

832 Returns: A ShaFile object 

833 Raises: 

834 KeyError: when the object can not be found 

835 """ 

836 return self.object_store[sha] 

837 

838 def parents_provider(self) -> ParentsProvider: 

839 """Get a parents provider for this repository. 

840 

841 Returns: 

842 ParentsProvider instance configured with grafts and shallows 

843 """ 

844 return ParentsProvider( 

845 self.object_store, 

846 grafts=self._graftpoints, 

847 shallows=self.get_shallow(), 

848 ) 

849 

850 def get_parents(self, sha: bytes, commit: Commit | None = None) -> list[bytes]: 

851 """Retrieve the parents of a specific commit. 

852 

853 If the specific commit is a graftpoint, the graft parents 

854 will be returned instead. 

855 

856 Args: 

857 sha: SHA of the commit for which to retrieve the parents 

858 commit: Optional commit matching the sha 

859 Returns: List of parents 

860 """ 

861 return self.parents_provider().get_parents(sha, commit) 

862 

863 def get_config(self) -> "ConfigFile": 

864 """Retrieve the config object. 

865 

866 Returns: `ConfigFile` object for the ``.git/config`` file. 

867 """ 

868 raise NotImplementedError(self.get_config) 

869 

870 def get_worktree_config(self) -> "ConfigFile": 

871 """Retrieve the worktree config object.""" 

872 raise NotImplementedError(self.get_worktree_config) 

873 

874 def get_description(self) -> bytes | None: 

875 """Retrieve the description for this repository. 

876 

877 Returns: Bytes with the description of the repository 

878 as set by the user. 

879 """ 

880 raise NotImplementedError(self.get_description) 

881 

882 def set_description(self, description: bytes) -> None: 

883 """Set the description for this repository. 

884 

885 Args: 

886 description: Text to set as description for this repository. 

887 """ 

888 raise NotImplementedError(self.set_description) 

889 

890 def get_rebase_state_manager(self) -> "RebaseStateManager": 

891 """Get the appropriate rebase state manager for this repository. 

892 

893 Returns: RebaseStateManager instance 

894 """ 

895 raise NotImplementedError(self.get_rebase_state_manager) 

896 

897 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

898 """Return a BlobNormalizer object for checkin/checkout operations. 

899 

900 Returns: BlobNormalizer instance 

901 """ 

902 raise NotImplementedError(self.get_blob_normalizer) 

903 

904 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

905 """Read gitattributes for the repository. 

906 

907 Args: 

908 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

909 

910 Returns: 

911 GitAttributes object that can be used to match paths 

912 """ 

913 raise NotImplementedError(self.get_gitattributes) 

914 

915 def get_config_stack(self) -> "StackedConfig": 

916 """Return a config stack for this repository. 

917 

918 This stack accesses the configuration for both this repository 

919 itself (.git/config) and the global configuration, which usually 

920 lives in ~/.gitconfig. 

921 

922 Returns: `Config` instance for this repository 

923 """ 

924 from .config import ConfigFile, StackedConfig 

925 

926 local_config = self.get_config() 

927 backends: list[ConfigFile] = [local_config] 

928 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

929 backends.append(self.get_worktree_config()) 

930 

931 backends += StackedConfig.default_backends() 

932 return StackedConfig(backends, writable=local_config) 

933 

934 def get_shallow(self) -> set[ObjectID]: 

935 """Get the set of shallow commits. 

936 

937 Returns: Set of shallow commits. 

938 """ 

939 f = self.get_named_file("shallow") 

940 if f is None: 

941 return set() 

942 with f: 

943 return {line.strip() for line in f} 

944 

945 def update_shallow( 

946 self, new_shallow: set[bytes] | None, new_unshallow: set[bytes] | None 

947 ) -> None: 

948 """Update the list of shallow objects. 

949 

950 Args: 

951 new_shallow: Newly shallow objects 

952 new_unshallow: Newly no longer shallow objects 

953 """ 

954 shallow = self.get_shallow() 

955 if new_shallow: 

956 shallow.update(new_shallow) 

957 if new_unshallow: 

958 shallow.difference_update(new_unshallow) 

959 if shallow: 

960 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

961 else: 

962 self._del_named_file("shallow") 

963 

964 def get_peeled(self, ref: Ref) -> ObjectID: 

965 """Get the peeled value of a ref. 

966 

967 Args: 

968 ref: The refname to peel. 

969 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

970 intermediate tags; if the original ref does not point to a tag, 

971 this will equal the original SHA1. 

972 """ 

973 cached = self.refs.get_peeled(ref) 

974 if cached is not None: 

975 return cached 

976 return peel_sha(self.object_store, self.refs[ref])[1].id 

977 

978 @property 

979 def notes(self) -> "Notes": 

980 """Access notes functionality for this repository. 

981 

982 Returns: 

983 Notes object for accessing notes 

984 """ 

985 from .notes import Notes 

986 

987 return Notes(self.object_store, self.refs) 

988 

989 def get_walker( 

990 self, 

991 include: Sequence[bytes] | None = None, 

992 exclude: Sequence[bytes] | None = None, 

993 order: str = "date", 

994 reverse: bool = False, 

995 max_entries: int | None = None, 

996 paths: Sequence[bytes] | None = None, 

997 rename_detector: "RenameDetector | None" = None, 

998 follow: bool = False, 

999 since: int | None = None, 

1000 until: int | None = None, 

1001 queue_cls: type | None = None, 

1002 ) -> "Walker": 

1003 """Obtain a walker for this repository. 

1004 

1005 Args: 

1006 include: Iterable of SHAs of commits to include along with their 

1007 ancestors. Defaults to [HEAD] 

1008 exclude: Iterable of SHAs of commits to exclude along with their 

1009 ancestors, overriding includes. 

1010 order: ORDER_* constant specifying the order of results. 

1011 Anything other than ORDER_DATE may result in O(n) memory usage. 

1012 reverse: If True, reverse the order of output, requiring O(n) 

1013 memory. 

1014 max_entries: The maximum number of entries to yield, or None for 

1015 no limit. 

1016 paths: Iterable of file or subtree paths to show entries for. 

1017 rename_detector: diff.RenameDetector object for detecting 

1018 renames. 

1019 follow: If True, follow path across renames/copies. Forces a 

1020 default rename_detector. 

1021 since: Timestamp to list commits after. 

1022 until: Timestamp to list commits before. 

1023 queue_cls: A class to use for a queue of commits, supporting the 

1024 iterator protocol. The constructor takes a single argument, the Walker. 

1025 

1026 Returns: A `Walker` object 

1027 """ 

1028 from .walk import Walker, _CommitTimeQueue 

1029 

1030 if include is None: 

1031 include = [self.head()] 

1032 

1033 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

1034 return Walker( 

1035 self.object_store, 

1036 include, 

1037 exclude=exclude, 

1038 order=order, 

1039 reverse=reverse, 

1040 max_entries=max_entries, 

1041 paths=paths, 

1042 rename_detector=rename_detector, 

1043 follow=follow, 

1044 since=since, 

1045 until=until, 

1046 get_parents=lambda commit: self.get_parents(commit.id, commit), 

1047 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

1048 ) 

1049 

1050 def __getitem__(self, name: ObjectID | Ref) -> "ShaFile": 

1051 """Retrieve a Git object by SHA1 or ref. 

1052 

1053 Args: 

1054 name: A Git object SHA1 or a ref name 

1055 Returns: A `ShaFile` object, such as a Commit or Blob 

1056 Raises: 

1057 KeyError: when the specified ref or object does not exist 

1058 """ 

1059 if not isinstance(name, bytes): 

1060 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

1061 if len(name) in (20, 40): 

1062 try: 

1063 return self.object_store[name] 

1064 except (KeyError, ValueError): 

1065 pass 

1066 try: 

1067 return self.object_store[self.refs[name]] 

1068 except RefFormatError as exc: 

1069 raise KeyError(name) from exc 

1070 

1071 def __contains__(self, name: bytes) -> bool: 

1072 """Check if a specific Git object or ref is present. 

1073 

1074 Args: 

1075 name: Git object SHA1 or ref name 

1076 """ 

1077 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

1078 return name in self.object_store or name in self.refs 

1079 else: 

1080 return name in self.refs 

1081 

1082 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None: 

1083 """Set a ref. 

1084 

1085 Args: 

1086 name: ref name 

1087 value: Ref value - either a ShaFile object, or a hex sha 

1088 """ 

1089 if name.startswith(b"refs/") or name == b"HEAD": 

1090 if isinstance(value, ShaFile): 

1091 self.refs[name] = value.id 

1092 elif isinstance(value, bytes): 

1093 self.refs[name] = value 

1094 else: 

1095 raise TypeError(value) 

1096 else: 

1097 raise ValueError(name) 

1098 

1099 def __delitem__(self, name: bytes) -> None: 

1100 """Remove a ref. 

1101 

1102 Args: 

1103 name: Name of the ref to remove 

1104 """ 

1105 if name.startswith(b"refs/") or name == b"HEAD": 

1106 del self.refs[name] 

1107 else: 

1108 raise ValueError(name) 

1109 

1110 def _get_user_identity( 

1111 self, config: "StackedConfig", kind: str | None = None 

1112 ) -> bytes: 

1113 """Determine the identity to use for new commits.""" 

1114 warnings.warn( 

1115 "use get_user_identity() rather than Repo._get_user_identity", 

1116 DeprecationWarning, 

1117 ) 

1118 return get_user_identity(config) 

1119 

1120 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1121 """Add or modify graftpoints. 

1122 

1123 Args: 

1124 updated_graftpoints: Dict of commit shas to list of parent shas 

1125 """ 

1126 # Simple validation 

1127 for commit, parents in updated_graftpoints.items(): 

1128 for sha in [commit, *parents]: 

1129 check_hexsha(sha, "Invalid graftpoint") 

1130 

1131 self._graftpoints.update(updated_graftpoints) 

1132 

1133 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None: 

1134 """Remove graftpoints. 

1135 

1136 Args: 

1137 to_remove: List of commit shas 

1138 """ 

1139 for sha in to_remove: 

1140 del self._graftpoints[sha] 

1141 

1142 def _read_heads(self, name: str) -> list[bytes]: 

1143 f = self.get_named_file(name) 

1144 if f is None: 

1145 return [] 

1146 with f: 

1147 return [line.strip() for line in f.readlines() if line.strip()] 

1148 

1149 def get_worktree(self) -> "WorkTree": 

1150 """Get the working tree for this repository. 

1151 

1152 Returns: 

1153 WorkTree instance for performing working tree operations 

1154 

1155 Raises: 

1156 NotImplementedError: If the repository doesn't support working trees 

1157 """ 

1158 raise NotImplementedError( 

1159 "Working tree operations not supported by this repository type" 

1160 ) 

1161 

1162 @replace_me(remove_in="0.26.0") 

1163 def do_commit( 

1164 self, 

1165 message: bytes | None = None, 

1166 committer: bytes | None = None, 

1167 author: bytes | None = None, 

1168 commit_timestamp: float | None = None, 

1169 commit_timezone: int | None = None, 

1170 author_timestamp: float | None = None, 

1171 author_timezone: int | None = None, 

1172 tree: ObjectID | None = None, 

1173 encoding: bytes | None = None, 

1174 ref: Ref | None = b"HEAD", 

1175 merge_heads: list[ObjectID] | None = None, 

1176 no_verify: bool = False, 

1177 sign: bool = False, 

1178 ) -> bytes: 

1179 """Create a new commit. 

1180 

1181 If not specified, committer and author default to 

1182 get_user_identity(..., 'COMMITTER') 

1183 and get_user_identity(..., 'AUTHOR') respectively. 

1184 

1185 Args: 

1186 message: Commit message (bytes or callable that takes (repo, commit) 

1187 and returns bytes) 

1188 committer: Committer fullname 

1189 author: Author fullname 

1190 commit_timestamp: Commit timestamp (defaults to now) 

1191 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1192 author_timestamp: Author timestamp (defaults to commit 

1193 timestamp) 

1194 author_timezone: Author timestamp timezone 

1195 (defaults to commit timestamp timezone) 

1196 tree: SHA1 of the tree root to use (if not specified the 

1197 current index will be committed). 

1198 encoding: Encoding 

1199 ref: Optional ref to commit to (defaults to current branch). 

1200 If None, creates a dangling commit without updating any ref. 

1201 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1202 no_verify: Skip pre-commit and commit-msg hooks 

1203 sign: GPG Sign the commit (bool, defaults to False, 

1204 pass True to use default GPG key, 

1205 pass a str containing Key ID to use a specific GPG key) 

1206 

1207 Returns: 

1208 New commit SHA1 

1209 """ 

1210 return self.get_worktree().commit( 

1211 message=message, 

1212 committer=committer, 

1213 author=author, 

1214 commit_timestamp=commit_timestamp, 

1215 commit_timezone=commit_timezone, 

1216 author_timestamp=author_timestamp, 

1217 author_timezone=author_timezone, 

1218 tree=tree, 

1219 encoding=encoding, 

1220 ref=ref, 

1221 merge_heads=merge_heads, 

1222 no_verify=no_verify, 

1223 sign=sign, 

1224 ) 

1225 

1226 

1227def read_gitfile(f: BinaryIO) -> str: 

1228 """Read a ``.git`` file. 

1229 

1230 The first line of the file should start with "gitdir: " 

1231 

1232 Args: 

1233 f: File-like object to read from 

1234 Returns: A path 

1235 """ 

1236 cs = f.read() 

1237 if not cs.startswith(b"gitdir: "): 

1238 raise ValueError("Expected file to start with 'gitdir: '") 

1239 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8") 

1240 

1241 

1242class UnsupportedVersion(Exception): 

1243 """Unsupported repository version.""" 

1244 

1245 def __init__(self, version: int) -> None: 

1246 """Initialize UnsupportedVersion exception. 

1247 

1248 Args: 

1249 version: The unsupported repository version 

1250 """ 

1251 self.version = version 

1252 

1253 

1254class UnsupportedExtension(Exception): 

1255 """Unsupported repository extension.""" 

1256 

1257 def __init__(self, extension: str) -> None: 

1258 """Initialize UnsupportedExtension exception. 

1259 

1260 Args: 

1261 extension: The unsupported repository extension 

1262 """ 

1263 self.extension = extension 

1264 

1265 

1266class Repo(BaseRepo): 

1267 """A git repository backed by local disk. 

1268 

1269 To open an existing repository, call the constructor with 

1270 the path of the repository. 

1271 

1272 To create a new repository, use the Repo.init class method. 

1273 

1274 Note that a repository object may hold on to resources such 

1275 as file handles for performance reasons; call .close() to free 

1276 up those resources. 

1277 

1278 Attributes: 

1279 path: Path to the working copy (if it exists) or repository control 

1280 directory (if the repository is bare) 

1281 bare: Whether this is a bare repository 

1282 """ 

1283 

1284 path: str 

1285 bare: bool 

1286 object_store: DiskObjectStore 

1287 filter_context: "FilterContext | None" 

1288 

1289 def __init__( 

1290 self, 

1291 root: str | bytes | os.PathLike[str], 

1292 object_store: PackBasedObjectStore | None = None, 

1293 bare: bool | None = None, 

1294 ) -> None: 

1295 """Open a repository on disk. 

1296 

1297 Args: 

1298 root: Path to the repository's root. 

1299 object_store: ObjectStore to use; if omitted, we use the 

1300 repository's default object store 

1301 bare: True if this is a bare repository. 

1302 """ 

1303 root = os.fspath(root) 

1304 if isinstance(root, bytes): 

1305 root = os.fsdecode(root) 

1306 hidden_path = os.path.join(root, CONTROLDIR) 

1307 if bare is None: 

1308 if os.path.isfile(hidden_path) or os.path.isdir( 

1309 os.path.join(hidden_path, OBJECTDIR) 

1310 ): 

1311 bare = False 

1312 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1313 os.path.join(root, REFSDIR) 

1314 ): 

1315 bare = True 

1316 else: 

1317 raise NotGitRepository( 

1318 "No git repository was found at {path}".format(**dict(path=root)) 

1319 ) 

1320 

1321 self.bare = bare 

1322 if bare is False: 

1323 if os.path.isfile(hidden_path): 

1324 with open(hidden_path, "rb") as f: 

1325 path = read_gitfile(f) 

1326 self._controldir = os.path.join(root, path) 

1327 else: 

1328 self._controldir = hidden_path 

1329 else: 

1330 self._controldir = root 

1331 commondir = self.get_named_file(COMMONDIR) 

1332 if commondir is not None: 

1333 with commondir: 

1334 self._commondir = os.path.join( 

1335 self.controldir(), 

1336 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1337 ) 

1338 else: 

1339 self._commondir = self._controldir 

1340 self.path = root 

1341 

1342 # Initialize refs early so they're available for config condition matchers 

1343 self.refs = DiskRefsContainer( 

1344 self.commondir(), self._controldir, logger=self._write_reflog 

1345 ) 

1346 

1347 # Initialize worktrees container 

1348 from .worktree import WorkTreeContainer 

1349 

1350 self.worktrees = WorkTreeContainer(self) 

1351 

1352 config = self.get_config() 

1353 try: 

1354 repository_format_version = config.get("core", "repositoryformatversion") 

1355 format_version = ( 

1356 0 

1357 if repository_format_version is None 

1358 else int(repository_format_version) 

1359 ) 

1360 except KeyError: 

1361 format_version = 0 

1362 

1363 if format_version not in (0, 1): 

1364 raise UnsupportedVersion(format_version) 

1365 

1366 # Track extensions we encounter 

1367 has_reftable_extension = False 

1368 for extension, value in config.items((b"extensions",)): 

1369 if extension.lower() == b"refstorage": 

1370 if value == b"reftable": 

1371 has_reftable_extension = True 

1372 else: 

1373 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1374 elif extension.lower() not in (b"worktreeconfig",): 

1375 raise UnsupportedExtension(extension.decode("utf-8")) 

1376 

1377 if object_store is None: 

1378 # Get shared repository permissions from config 

1379 try: 

1380 shared_value = config.get(("core",), "sharedRepository") 

1381 file_mode, dir_mode = parse_shared_repository(shared_value) 

1382 except KeyError: 

1383 file_mode, dir_mode = None, None 

1384 

1385 object_store = DiskObjectStore.from_config( 

1386 os.path.join(self.commondir(), OBJECTDIR), 

1387 config, 

1388 file_mode=file_mode, 

1389 dir_mode=dir_mode, 

1390 ) 

1391 

1392 # Use reftable if extension is configured 

1393 if has_reftable_extension: 

1394 from .reftable import ReftableRefsContainer 

1395 

1396 self.refs = ReftableRefsContainer(self.commondir()) 

1397 # Update worktrees container after refs change 

1398 self.worktrees = WorkTreeContainer(self) 

1399 BaseRepo.__init__(self, object_store, self.refs) 

1400 

1401 self._graftpoints = {} 

1402 graft_file = self.get_named_file( 

1403 os.path.join("info", "grafts"), basedir=self.commondir() 

1404 ) 

1405 if graft_file: 

1406 with graft_file: 

1407 self._graftpoints.update(parse_graftpoints(graft_file)) 

1408 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1409 if graft_file: 

1410 with graft_file: 

1411 self._graftpoints.update(parse_graftpoints(graft_file)) 

1412 

1413 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1414 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1415 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1416 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1417 

1418 # Initialize filter context as None, will be created lazily 

1419 self.filter_context = None 

1420 

1421 def get_worktree(self) -> "WorkTree": 

1422 """Get the working tree for this repository. 

1423 

1424 Returns: 

1425 WorkTree instance for performing working tree operations 

1426 """ 

1427 from .worktree import WorkTree 

1428 

1429 return WorkTree(self, self.path) 

1430 

1431 def _write_reflog( 

1432 self, 

1433 ref: bytes, 

1434 old_sha: bytes, 

1435 new_sha: bytes, 

1436 committer: bytes | None, 

1437 timestamp: int | None, 

1438 timezone: int | None, 

1439 message: bytes, 

1440 ) -> None: 

1441 from .reflog import format_reflog_line 

1442 

1443 path = self._reflog_path(ref) 

1444 

1445 # Get shared repository permissions 

1446 file_mode, dir_mode = self._get_shared_repository_permissions() 

1447 

1448 # Create directory with appropriate permissions 

1449 parent_dir = os.path.dirname(path) 

1450 # Create directory tree, setting permissions on each level if needed 

1451 parts = [] 

1452 current = parent_dir 

1453 while current and not os.path.exists(current): 

1454 parts.append(current) 

1455 current = os.path.dirname(current) 

1456 parts.reverse() 

1457 for part in parts: 

1458 os.mkdir(part) 

1459 if dir_mode is not None: 

1460 os.chmod(part, dir_mode) 

1461 if committer is None: 

1462 config = self.get_config_stack() 

1463 committer = get_user_identity(config) 

1464 check_user_identity(committer) 

1465 if timestamp is None: 

1466 timestamp = int(time.time()) 

1467 if timezone is None: 

1468 timezone = 0 # FIXME 

1469 with open(path, "ab") as f: 

1470 f.write( 

1471 format_reflog_line( 

1472 old_sha, new_sha, committer, timestamp, timezone, message 

1473 ) 

1474 + b"\n" 

1475 ) 

1476 

1477 # Set file permissions (open() respects umask, so we need chmod to set the actual mode) 

1478 # Always chmod to ensure correct permissions even if file already existed 

1479 if file_mode is not None: 

1480 os.chmod(path, file_mode) 

1481 

1482 def _reflog_path(self, ref: bytes) -> str: 

1483 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1484 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1485 

1486 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1487 return os.path.join(base, "logs", os.fsdecode(ref)) 

1488 

1489 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1490 """Read reflog entries for a reference. 

1491 

1492 Args: 

1493 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1494 

1495 Yields: 

1496 reflog.Entry objects in chronological order (oldest first) 

1497 """ 

1498 from .reflog import read_reflog 

1499 

1500 path = self._reflog_path(ref) 

1501 try: 

1502 with open(path, "rb") as f: 

1503 yield from read_reflog(f) 

1504 except FileNotFoundError: 

1505 return 

1506 

1507 @classmethod 

1508 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo": 

1509 """Iterate parent directories to discover a repository. 

1510 

1511 Return a Repo object for the first parent directory that looks like a 

1512 Git repository. 

1513 

1514 Args: 

1515 start: The directory to start discovery from (defaults to '.') 

1516 """ 

1517 path = os.path.abspath(start) 

1518 while True: 

1519 try: 

1520 return cls(path) 

1521 except NotGitRepository: 

1522 new_path, _tail = os.path.split(path) 

1523 if new_path == path: # Root reached 

1524 break 

1525 path = new_path 

1526 start_str = os.fspath(start) 

1527 if isinstance(start_str, bytes): 

1528 start_str = start_str.decode("utf-8") 

1529 raise NotGitRepository(f"No git repository was found at {start_str}") 

1530 

1531 def controldir(self) -> str: 

1532 """Return the path of the control directory.""" 

1533 return self._controldir 

1534 

1535 def commondir(self) -> str: 

1536 """Return the path of the common directory. 

1537 

1538 For a main working tree, it is identical to controldir(). 

1539 

1540 For a linked working tree, it is the control directory of the 

1541 main working tree. 

1542 """ 

1543 return self._commondir 

1544 

1545 def _determine_file_mode(self) -> bool: 

1546 """Probe the file-system to determine whether permissions can be trusted. 

1547 

1548 Returns: True if permissions can be trusted, False otherwise. 

1549 """ 

1550 fname = os.path.join(self.path, ".probe-permissions") 

1551 with open(fname, "w") as f: 

1552 f.write("") 

1553 

1554 st1 = os.lstat(fname) 

1555 try: 

1556 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1557 except PermissionError: 

1558 return False 

1559 st2 = os.lstat(fname) 

1560 

1561 os.unlink(fname) 

1562 

1563 mode_differs = st1.st_mode != st2.st_mode 

1564 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1565 

1566 return mode_differs and st2_has_exec 

1567 

1568 def _determine_symlinks(self) -> bool: 

1569 """Probe the filesystem to determine whether symlinks can be created. 

1570 

1571 Returns: True if symlinks can be created, False otherwise. 

1572 """ 

1573 # TODO(jelmer): Actually probe disk / look at filesystem 

1574 return sys.platform != "win32" 

1575 

1576 def _get_shared_repository_permissions( 

1577 self, 

1578 ) -> tuple[int | None, int | None]: 

1579 """Get shared repository file and directory permissions from config. 

1580 

1581 Returns: 

1582 tuple of (file_mask, directory_mask) or (None, None) if not shared 

1583 """ 

1584 try: 

1585 config = self.get_config() 

1586 value = config.get(("core",), "sharedRepository") 

1587 return parse_shared_repository(value) 

1588 except KeyError: 

1589 return (None, None) 

1590 

1591 def _put_named_file(self, path: str, contents: bytes) -> None: 

1592 """Write a file to the control dir with the given name and contents. 

1593 

1594 Args: 

1595 path: The path to the file, relative to the control dir. 

1596 contents: A string to write to the file. 

1597 """ 

1598 path = path.lstrip(os.path.sep) 

1599 

1600 # Get shared repository permissions 

1601 file_mode, _ = self._get_shared_repository_permissions() 

1602 

1603 # Create file with appropriate permissions 

1604 if file_mode is not None: 

1605 with GitFile( 

1606 os.path.join(self.controldir(), path), "wb", mask=file_mode 

1607 ) as f: 

1608 f.write(contents) 

1609 else: 

1610 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1611 f.write(contents) 

1612 

1613 def _del_named_file(self, path: str) -> None: 

1614 try: 

1615 os.unlink(os.path.join(self.controldir(), path)) 

1616 except FileNotFoundError: 

1617 return 

1618 

1619 def get_named_file( 

1620 self, 

1621 path: str | bytes, 

1622 basedir: str | None = None, 

1623 ) -> BinaryIO | None: 

1624 """Get a file from the control dir with a specific name. 

1625 

1626 Although the filename should be interpreted as a filename relative to 

1627 the control dir in a disk-based Repo, the object returned need not be 

1628 pointing to a file in that location. 

1629 

1630 Args: 

1631 path: The path to the file, relative to the control dir. 

1632 basedir: Optional argument that specifies an alternative to the 

1633 control dir. 

1634 Returns: An open file object, or None if the file does not exist. 

1635 """ 

1636 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1637 # the dumb web serving code. 

1638 if basedir is None: 

1639 basedir = self.controldir() 

1640 if isinstance(path, bytes): 

1641 path = path.decode("utf-8") 

1642 path = path.lstrip(os.path.sep) 

1643 try: 

1644 return open(os.path.join(basedir, path), "rb") 

1645 except FileNotFoundError: 

1646 return None 

1647 

1648 def index_path(self) -> str: 

1649 """Return path to the index file.""" 

1650 return os.path.join(self.controldir(), INDEX_FILENAME) 

1651 

1652 def open_index(self) -> "Index": 

1653 """Open the index for this repository. 

1654 

1655 Raises: 

1656 NoIndexPresent: If no index is present 

1657 Returns: The matching `Index` 

1658 """ 

1659 from .index import Index 

1660 

1661 if not self.has_index(): 

1662 raise NoIndexPresent 

1663 

1664 # Check for manyFiles feature configuration 

1665 config = self.get_config_stack() 

1666 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1667 skip_hash = False 

1668 index_version = None 

1669 

1670 if many_files: 

1671 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1672 try: 

1673 index_version_str = config.get(b"index", b"version") 

1674 index_version = int(index_version_str) 

1675 except KeyError: 

1676 index_version = 4 # Default to version 4 for manyFiles 

1677 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1678 else: 

1679 # Check for explicit index settings 

1680 try: 

1681 index_version_str = config.get(b"index", b"version") 

1682 index_version = int(index_version_str) 

1683 except KeyError: 

1684 index_version = None 

1685 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1686 

1687 # Get shared repository permissions for index file 

1688 file_mode, _ = self._get_shared_repository_permissions() 

1689 

1690 return Index( 

1691 self.index_path(), 

1692 skip_hash=skip_hash, 

1693 version=index_version, 

1694 file_mode=file_mode, 

1695 ) 

1696 

1697 def has_index(self) -> bool: 

1698 """Check if an index is present.""" 

1699 # Bare repos must never have index files; non-bare repos may have a 

1700 # missing index file, which is treated as empty. 

1701 return not self.bare 

1702 

1703 @replace_me(remove_in="0.26.0") 

1704 def stage( 

1705 self, 

1706 fs_paths: str 

1707 | bytes 

1708 | os.PathLike[str] 

1709 | Iterable[str | bytes | os.PathLike[str]], 

1710 ) -> None: 

1711 """Stage a set of paths. 

1712 

1713 Args: 

1714 fs_paths: List of paths, relative to the repository path 

1715 """ 

1716 return self.get_worktree().stage(fs_paths) 

1717 

1718 @replace_me(remove_in="0.26.0") 

1719 def unstage(self, fs_paths: Sequence[str]) -> None: 

1720 """Unstage specific file in the index. 

1721 

1722 Args: 

1723 fs_paths: a list of files to unstage, 

1724 relative to the repository path. 

1725 """ 

1726 return self.get_worktree().unstage(fs_paths) 

1727 

1728 def clone( 

1729 self, 

1730 target_path: str | bytes | os.PathLike[str], 

1731 *, 

1732 mkdir: bool = True, 

1733 bare: bool = False, 

1734 origin: bytes = b"origin", 

1735 checkout: bool | None = None, 

1736 branch: bytes | None = None, 

1737 progress: Callable[[str], None] | None = None, 

1738 depth: int | None = None, 

1739 symlinks: bool | None = None, 

1740 ) -> "Repo": 

1741 """Clone this repository. 

1742 

1743 Args: 

1744 target_path: Target path 

1745 mkdir: Create the target directory 

1746 bare: Whether to create a bare repository 

1747 checkout: Whether or not to check-out HEAD after cloning 

1748 origin: Base name for refs in target repository 

1749 cloned from this repository 

1750 branch: Optional branch or tag to be used as HEAD in the new repository 

1751 instead of this repository's HEAD. 

1752 progress: Optional progress function 

1753 depth: Depth at which to fetch 

1754 symlinks: Symlinks setting (default to autodetect) 

1755 Returns: Created repository as `Repo` 

1756 """ 

1757 encoded_path = os.fsencode(self.path) 

1758 

1759 if mkdir: 

1760 os.mkdir(target_path) 

1761 

1762 try: 

1763 if not bare: 

1764 target = Repo.init(target_path, symlinks=symlinks) 

1765 if checkout is None: 

1766 checkout = True 

1767 else: 

1768 if checkout: 

1769 raise ValueError("checkout and bare are incompatible") 

1770 target = Repo.init_bare(target_path) 

1771 

1772 try: 

1773 target_config = target.get_config() 

1774 target_config.set((b"remote", origin), b"url", encoded_path) 

1775 target_config.set( 

1776 (b"remote", origin), 

1777 b"fetch", 

1778 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1779 ) 

1780 target_config.write_to_path() 

1781 

1782 ref_message = b"clone: from " + encoded_path 

1783 self.fetch(target, depth=depth) 

1784 target.refs.import_refs( 

1785 b"refs/remotes/" + origin, 

1786 self.refs.as_dict(b"refs/heads"), 

1787 message=ref_message, 

1788 ) 

1789 target.refs.import_refs( 

1790 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1791 ) 

1792 

1793 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1794 origin_head = head_chain[-1] if head_chain else None 

1795 if origin_sha and not origin_head: 

1796 # set detached HEAD 

1797 target.refs[b"HEAD"] = origin_sha 

1798 else: 

1799 _set_origin_head(target.refs, origin, origin_head) 

1800 head_ref = _set_default_branch( 

1801 target.refs, origin, origin_head, branch, ref_message 

1802 ) 

1803 

1804 # Update target head 

1805 if head_ref: 

1806 head = _set_head(target.refs, head_ref, ref_message) 

1807 else: 

1808 head = None 

1809 

1810 if checkout and head is not None: 

1811 target.get_worktree().reset_index() 

1812 except BaseException: 

1813 target.close() 

1814 raise 

1815 except BaseException: 

1816 if mkdir: 

1817 import shutil 

1818 

1819 shutil.rmtree(target_path) 

1820 raise 

1821 return target 

1822 

1823 @replace_me(remove_in="0.26.0") 

1824 def reset_index(self, tree: bytes | None = None) -> None: 

1825 """Reset the index back to a specific tree. 

1826 

1827 Args: 

1828 tree: Tree SHA to reset to, None for current HEAD tree. 

1829 """ 

1830 return self.get_worktree().reset_index(tree) 

1831 

1832 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1833 """Get condition matchers for includeIf conditions. 

1834 

1835 Returns a dict of condition prefix to matcher function. 

1836 """ 

1837 from pathlib import Path 

1838 

1839 from .config import ConditionMatcher, match_glob_pattern 

1840 

1841 # Add gitdir matchers 

1842 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1843 """Match gitdir against a pattern. 

1844 

1845 Args: 

1846 pattern: Pattern to match against 

1847 case_sensitive: Whether to match case-sensitively 

1848 

1849 Returns: 

1850 True if gitdir matches pattern 

1851 """ 

1852 # Handle relative patterns (starting with ./) 

1853 if pattern.startswith("./"): 

1854 # Can't handle relative patterns without config directory context 

1855 return False 

1856 

1857 # Normalize repository path 

1858 try: 

1859 repo_path = str(Path(self._controldir).resolve()) 

1860 except (OSError, ValueError): 

1861 return False 

1862 

1863 # Expand ~ in pattern and normalize 

1864 pattern = os.path.expanduser(pattern) 

1865 

1866 # Normalize pattern following Git's rules 

1867 pattern = pattern.replace("\\", "/") 

1868 if not pattern.startswith(("~/", "./", "/", "**")): 

1869 # Check for Windows absolute path 

1870 if len(pattern) >= 2 and pattern[1] == ":": 

1871 pass 

1872 else: 

1873 pattern = "**/" + pattern 

1874 if pattern.endswith("/"): 

1875 pattern = pattern + "**" 

1876 

1877 # Use the existing _match_gitdir_pattern function 

1878 from .config import _match_gitdir_pattern 

1879 

1880 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1881 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1882 

1883 return _match_gitdir_pattern( 

1884 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1885 ) 

1886 

1887 # Add onbranch matcher 

1888 def match_onbranch(pattern: str) -> bool: 

1889 """Match current branch against a pattern. 

1890 

1891 Args: 

1892 pattern: Pattern to match against 

1893 

1894 Returns: 

1895 True if current branch matches pattern 

1896 """ 

1897 try: 

1898 # Get the current branch using refs 

1899 ref_chain, _ = self.refs.follow(b"HEAD") 

1900 head_ref = ref_chain[-1] # Get the final resolved ref 

1901 except KeyError: 

1902 pass 

1903 else: 

1904 if head_ref and head_ref.startswith(b"refs/heads/"): 

1905 # Extract branch name from ref 

1906 branch = extract_branch_name(head_ref).decode( 

1907 "utf-8", errors="replace" 

1908 ) 

1909 return match_glob_pattern(branch, pattern) 

1910 return False 

1911 

1912 matchers: dict[str, ConditionMatcher] = { 

1913 "onbranch:": match_onbranch, 

1914 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1915 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1916 } 

1917 

1918 return matchers 

1919 

1920 def get_worktree_config(self) -> "ConfigFile": 

1921 """Get the worktree-specific config. 

1922 

1923 Returns: 

1924 ConfigFile object for the worktree config 

1925 """ 

1926 from .config import ConfigFile 

1927 

1928 path = os.path.join(self.commondir(), "config.worktree") 

1929 try: 

1930 # Pass condition matchers for includeIf evaluation 

1931 condition_matchers = self._get_config_condition_matchers() 

1932 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1933 except FileNotFoundError: 

1934 cf = ConfigFile() 

1935 cf.path = path 

1936 return cf 

1937 

1938 def get_config(self) -> "ConfigFile": 

1939 """Retrieve the config object. 

1940 

1941 Returns: `ConfigFile` object for the ``.git/config`` file. 

1942 """ 

1943 from .config import ConfigFile 

1944 

1945 path = os.path.join(self._commondir, "config") 

1946 try: 

1947 # Pass condition matchers for includeIf evaluation 

1948 condition_matchers = self._get_config_condition_matchers() 

1949 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1950 except FileNotFoundError: 

1951 ret = ConfigFile() 

1952 ret.path = path 

1953 return ret 

1954 

1955 def get_rebase_state_manager(self) -> "RebaseStateManager": 

1956 """Get the appropriate rebase state manager for this repository. 

1957 

1958 Returns: DiskRebaseStateManager instance 

1959 """ 

1960 import os 

1961 

1962 from .rebase import DiskRebaseStateManager 

1963 

1964 path = os.path.join(self.controldir(), "rebase-merge") 

1965 return DiskRebaseStateManager(path) 

1966 

1967 def get_description(self) -> bytes | None: 

1968 """Retrieve the description of this repository. 

1969 

1970 Returns: Description as bytes or None. 

1971 """ 

1972 path = os.path.join(self._controldir, "description") 

1973 try: 

1974 with GitFile(path, "rb") as f: 

1975 return f.read() 

1976 except FileNotFoundError: 

1977 return None 

1978 

1979 def __repr__(self) -> str: 

1980 """Return string representation of this repository.""" 

1981 return f"<Repo at {self.path!r}>" 

1982 

1983 def set_description(self, description: bytes) -> None: 

1984 """Set the description for this repository. 

1985 

1986 Args: 

1987 description: Text to set as description for this repository. 

1988 """ 

1989 self._put_named_file("description", description) 

1990 

1991 @classmethod 

1992 def _init_maybe_bare( 

1993 cls, 

1994 path: str | bytes | os.PathLike[str], 

1995 controldir: str | bytes | os.PathLike[str], 

1996 bare: bool, 

1997 object_store: PackBasedObjectStore | None = None, 

1998 config: "StackedConfig | None" = None, 

1999 default_branch: bytes | None = None, 

2000 symlinks: bool | None = None, 

2001 format: int | None = None, 

2002 shared_repository: str | bool | None = None, 

2003 ) -> "Repo": 

2004 path = os.fspath(path) 

2005 if isinstance(path, bytes): 

2006 path = os.fsdecode(path) 

2007 controldir = os.fspath(controldir) 

2008 if isinstance(controldir, bytes): 

2009 controldir = os.fsdecode(controldir) 

2010 

2011 # Determine shared repository permissions early 

2012 file_mode: int | None = None 

2013 dir_mode: int | None = None 

2014 if shared_repository is not None: 

2015 file_mode, dir_mode = parse_shared_repository(shared_repository) 

2016 

2017 # Create base directories with appropriate permissions 

2018 for d in BASE_DIRECTORIES: 

2019 dir_path = os.path.join(controldir, *d) 

2020 os.mkdir(dir_path) 

2021 if dir_mode is not None: 

2022 os.chmod(dir_path, dir_mode) 

2023 

2024 if object_store is None: 

2025 object_store = DiskObjectStore.init( 

2026 os.path.join(controldir, OBJECTDIR), 

2027 file_mode=file_mode, 

2028 dir_mode=dir_mode, 

2029 ) 

2030 ret = cls(path, bare=bare, object_store=object_store) 

2031 if default_branch is None: 

2032 if config is None: 

2033 from .config import StackedConfig 

2034 

2035 config = StackedConfig.default() 

2036 try: 

2037 default_branch = config.get("init", "defaultBranch") 

2038 except KeyError: 

2039 default_branch = DEFAULT_BRANCH 

2040 ret.refs.set_symbolic_ref(b"HEAD", local_branch_name(default_branch)) 

2041 ret._init_files( 

2042 bare=bare, 

2043 symlinks=symlinks, 

2044 format=format, 

2045 shared_repository=shared_repository, 

2046 ) 

2047 return ret 

2048 

2049 @classmethod 

2050 def init( 

2051 cls, 

2052 path: str | bytes | os.PathLike[str], 

2053 *, 

2054 mkdir: bool = False, 

2055 config: "StackedConfig | None" = None, 

2056 default_branch: bytes | None = None, 

2057 symlinks: bool | None = None, 

2058 format: int | None = None, 

2059 shared_repository: str | bool | None = None, 

2060 ) -> "Repo": 

2061 """Create a new repository. 

2062 

2063 Args: 

2064 path: Path in which to create the repository 

2065 mkdir: Whether to create the directory 

2066 config: Configuration object 

2067 default_branch: Default branch name 

2068 symlinks: Whether to support symlinks 

2069 format: Repository format version (defaults to 0) 

2070 shared_repository: Shared repository setting (group, all, umask, or octal) 

2071 Returns: `Repo` instance 

2072 """ 

2073 path = os.fspath(path) 

2074 if isinstance(path, bytes): 

2075 path = os.fsdecode(path) 

2076 if mkdir: 

2077 os.mkdir(path) 

2078 controldir = os.path.join(path, CONTROLDIR) 

2079 os.mkdir(controldir) 

2080 _set_filesystem_hidden(controldir) 

2081 return cls._init_maybe_bare( 

2082 path, 

2083 controldir, 

2084 False, 

2085 config=config, 

2086 default_branch=default_branch, 

2087 symlinks=symlinks, 

2088 format=format, 

2089 shared_repository=shared_repository, 

2090 ) 

2091 

2092 @classmethod 

2093 def _init_new_working_directory( 

2094 cls, 

2095 path: str | bytes | os.PathLike[str], 

2096 main_repo: "Repo", 

2097 identifier: str | None = None, 

2098 mkdir: bool = False, 

2099 ) -> "Repo": 

2100 """Create a new working directory linked to a repository. 

2101 

2102 Args: 

2103 path: Path in which to create the working tree. 

2104 main_repo: Main repository to reference 

2105 identifier: Worktree identifier 

2106 mkdir: Whether to create the directory 

2107 Returns: `Repo` instance 

2108 """ 

2109 path = os.fspath(path) 

2110 if isinstance(path, bytes): 

2111 path = os.fsdecode(path) 

2112 if mkdir: 

2113 os.mkdir(path) 

2114 if identifier is None: 

2115 identifier = os.path.basename(path) 

2116 # Ensure we use absolute path for the worktree control directory 

2117 main_controldir = os.path.abspath(main_repo.controldir()) 

2118 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

2119 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

2120 gitdirfile = os.path.join(path, CONTROLDIR) 

2121 with open(gitdirfile, "wb") as f: 

2122 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

2123 

2124 # Get shared repository permissions from main repository 

2125 _, dir_mode = main_repo._get_shared_repository_permissions() 

2126 

2127 # Create directories with appropriate permissions 

2128 try: 

2129 os.mkdir(main_worktreesdir) 

2130 if dir_mode is not None: 

2131 os.chmod(main_worktreesdir, dir_mode) 

2132 except FileExistsError: 

2133 pass 

2134 try: 

2135 os.mkdir(worktree_controldir) 

2136 if dir_mode is not None: 

2137 os.chmod(worktree_controldir, dir_mode) 

2138 except FileExistsError: 

2139 pass 

2140 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

2141 f.write(os.fsencode(gitdirfile) + b"\n") 

2142 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

2143 f.write(b"../..\n") 

2144 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

2145 f.write(main_repo.head() + b"\n") 

2146 r = cls(os.path.normpath(path)) 

2147 r.get_worktree().reset_index() 

2148 return r 

2149 

2150 @classmethod 

2151 def init_bare( 

2152 cls, 

2153 path: str | bytes | os.PathLike[str], 

2154 *, 

2155 mkdir: bool = False, 

2156 object_store: PackBasedObjectStore | None = None, 

2157 config: "StackedConfig | None" = None, 

2158 default_branch: bytes | None = None, 

2159 format: int | None = None, 

2160 shared_repository: str | bool | None = None, 

2161 ) -> "Repo": 

2162 """Create a new bare repository. 

2163 

2164 ``path`` should already exist and be an empty directory. 

2165 

2166 Args: 

2167 path: Path to create bare repository in 

2168 mkdir: Whether to create the directory 

2169 object_store: Object store to use 

2170 config: Configuration object 

2171 default_branch: Default branch name 

2172 format: Repository format version (defaults to 0) 

2173 shared_repository: Shared repository setting (group, all, umask, or octal) 

2174 Returns: a `Repo` instance 

2175 """ 

2176 path = os.fspath(path) 

2177 if isinstance(path, bytes): 

2178 path = os.fsdecode(path) 

2179 if mkdir: 

2180 os.mkdir(path) 

2181 return cls._init_maybe_bare( 

2182 path, 

2183 path, 

2184 True, 

2185 object_store=object_store, 

2186 config=config, 

2187 default_branch=default_branch, 

2188 format=format, 

2189 shared_repository=shared_repository, 

2190 ) 

2191 

2192 create = init_bare 

2193 

2194 def close(self) -> None: 

2195 """Close any files opened by this repository.""" 

2196 self.object_store.close() 

2197 # Clean up filter context if it was created 

2198 if self.filter_context is not None: 

2199 self.filter_context.close() 

2200 self.filter_context = None 

2201 

2202 def __enter__(self) -> "Repo": 

2203 """Enter context manager.""" 

2204 return self 

2205 

2206 def __exit__( 

2207 self, 

2208 exc_type: type[BaseException] | None, 

2209 exc_val: BaseException | None, 

2210 exc_tb: TracebackType | None, 

2211 ) -> None: 

2212 """Exit context manager and close repository.""" 

2213 self.close() 

2214 

2215 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2216 """Read .gitattributes file from working tree. 

2217 

2218 Returns: 

2219 Dictionary mapping file patterns to attributes 

2220 """ 

2221 gitattributes = {} 

2222 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2223 

2224 if os.path.exists(gitattributes_path): 

2225 with open(gitattributes_path, "rb") as f: 

2226 for line in f: 

2227 line = line.strip() 

2228 if not line or line.startswith(b"#"): 

2229 continue 

2230 

2231 parts = line.split() 

2232 if len(parts) < 2: 

2233 continue 

2234 

2235 pattern = parts[0] 

2236 attrs = {} 

2237 

2238 for attr in parts[1:]: 

2239 if attr.startswith(b"-"): 

2240 # Unset attribute 

2241 attrs[attr[1:]] = b"false" 

2242 elif b"=" in attr: 

2243 # Set to value 

2244 key, value = attr.split(b"=", 1) 

2245 attrs[key] = value 

2246 else: 

2247 # Set attribute 

2248 attrs[attr] = b"true" 

2249 

2250 gitattributes[pattern] = attrs 

2251 

2252 return gitattributes 

2253 

2254 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2255 """Return a BlobNormalizer object.""" 

2256 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2257 

2258 # Get fresh configuration and GitAttributes 

2259 config_stack = self.get_config_stack() 

2260 git_attributes = self.get_gitattributes() 

2261 

2262 # Lazily create FilterContext if needed 

2263 if self.filter_context is None: 

2264 filter_registry = FilterRegistry(config_stack, self) 

2265 self.filter_context = FilterContext(filter_registry) 

2266 else: 

2267 # Refresh the context with current config to handle config changes 

2268 self.filter_context.refresh_config(config_stack) 

2269 

2270 # Return a new FilterBlobNormalizer with the context 

2271 return FilterBlobNormalizer( 

2272 config_stack, git_attributes, filter_context=self.filter_context 

2273 ) 

2274 

2275 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2276 """Read gitattributes for the repository. 

2277 

2278 Args: 

2279 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2280 

2281 Returns: 

2282 GitAttributes object that can be used to match paths 

2283 """ 

2284 from .attrs import ( 

2285 GitAttributes, 

2286 Pattern, 

2287 parse_git_attributes, 

2288 ) 

2289 

2290 patterns = [] 

2291 

2292 # Read system gitattributes (TODO: implement this) 

2293 # Read global gitattributes (TODO: implement this) 

2294 

2295 # Read repository .gitattributes from index/tree 

2296 if tree is None: 

2297 try: 

2298 # Try to get from HEAD 

2299 head = self[b"HEAD"] 

2300 if isinstance(head, Tag): 

2301 _cls, obj = head.object 

2302 head = self.get_object(obj) 

2303 assert isinstance(head, Commit) 

2304 tree = head.tree 

2305 except KeyError: 

2306 # No HEAD, no attributes from tree 

2307 pass 

2308 

2309 if tree is not None: 

2310 try: 

2311 tree_obj = self[tree] 

2312 assert isinstance(tree_obj, Tree) 

2313 if b".gitattributes" in tree_obj: 

2314 _, attrs_sha = tree_obj[b".gitattributes"] 

2315 attrs_blob = self[attrs_sha] 

2316 if isinstance(attrs_blob, Blob): 

2317 attrs_data = BytesIO(attrs_blob.data) 

2318 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2319 pattern = Pattern(pattern_bytes) 

2320 patterns.append((pattern, attrs)) 

2321 except (KeyError, NotTreeError): 

2322 pass 

2323 

2324 # Read .git/info/attributes 

2325 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2326 if os.path.exists(info_attrs_path): 

2327 with open(info_attrs_path, "rb") as f: 

2328 for pattern_bytes, attrs in parse_git_attributes(f): 

2329 pattern = Pattern(pattern_bytes) 

2330 patterns.append((pattern, attrs)) 

2331 

2332 # Read .gitattributes from working directory (if it exists) 

2333 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2334 if os.path.exists(working_attrs_path): 

2335 with open(working_attrs_path, "rb") as f: 

2336 for pattern_bytes, attrs in parse_git_attributes(f): 

2337 pattern = Pattern(pattern_bytes) 

2338 patterns.append((pattern, attrs)) 

2339 

2340 return GitAttributes(patterns) 

2341 

2342 @replace_me(remove_in="0.26.0") 

2343 def _sparse_checkout_file_path(self) -> str: 

2344 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2345 return self.get_worktree()._sparse_checkout_file_path() 

2346 

2347 @replace_me(remove_in="0.26.0") 

2348 def configure_for_cone_mode(self) -> None: 

2349 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2350 return self.get_worktree().configure_for_cone_mode() 

2351 

2352 @replace_me(remove_in="0.26.0") 

2353 def infer_cone_mode(self) -> bool: 

2354 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2355 return self.get_worktree().infer_cone_mode() 

2356 

2357 @replace_me(remove_in="0.26.0") 

2358 def get_sparse_checkout_patterns(self) -> list[str]: 

2359 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2360 

2361 Returns: 

2362 A list of patterns. Returns an empty list if the file is missing. 

2363 """ 

2364 return self.get_worktree().get_sparse_checkout_patterns() 

2365 

2366 @replace_me(remove_in="0.26.0") 

2367 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

2368 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2369 

2370 Creates the info/ directory if it does not exist. 

2371 

2372 Args: 

2373 patterns: A list of gitignore-style patterns to store. 

2374 """ 

2375 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2376 

2377 @replace_me(remove_in="0.26.0") 

2378 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

2379 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2380 

2381 For each directory to include, add an inclusion line that "undoes" the prior 

2382 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2383 Never add the same line twice. 

2384 """ 

2385 return self.get_worktree().set_cone_mode_patterns(dirs) 

2386 

2387 

2388class MemoryRepo(BaseRepo): 

2389 """Repo that stores refs, objects, and named files in memory. 

2390 

2391 MemoryRepos are always bare: they have no working tree and no index, since 

2392 those have a stronger dependency on the filesystem. 

2393 """ 

2394 

2395 filter_context: "FilterContext | None" 

2396 

2397 def __init__(self) -> None: 

2398 """Create a new repository in memory.""" 

2399 from .config import ConfigFile 

2400 

2401 self._reflog: list[Any] = [] 

2402 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2403 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2404 self._named_files: dict[str, bytes] = {} 

2405 self.bare = True 

2406 self._config = ConfigFile() 

2407 self._description: bytes | None = None 

2408 self.filter_context = None 

2409 

2410 def _append_reflog( 

2411 self, 

2412 ref: bytes, 

2413 old_sha: bytes | None, 

2414 new_sha: bytes | None, 

2415 committer: bytes | None, 

2416 timestamp: int | None, 

2417 timezone: int | None, 

2418 message: bytes | None, 

2419 ) -> None: 

2420 self._reflog.append( 

2421 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2422 ) 

2423 

2424 def set_description(self, description: bytes) -> None: 

2425 """Set the description for this repository. 

2426 

2427 Args: 

2428 description: Text to set as description 

2429 """ 

2430 self._description = description 

2431 

2432 def get_description(self) -> bytes | None: 

2433 """Get the description of this repository. 

2434 

2435 Returns: 

2436 Repository description as bytes 

2437 """ 

2438 return self._description 

2439 

2440 def _determine_file_mode(self) -> bool: 

2441 """Probe the file-system to determine whether permissions can be trusted. 

2442 

2443 Returns: True if permissions can be trusted, False otherwise. 

2444 """ 

2445 return sys.platform != "win32" 

2446 

2447 def _determine_symlinks(self) -> bool: 

2448 """Probe the file-system to determine whether permissions can be trusted. 

2449 

2450 Returns: True if permissions can be trusted, False otherwise. 

2451 """ 

2452 return sys.platform != "win32" 

2453 

2454 def _put_named_file(self, path: str, contents: bytes) -> None: 

2455 """Write a file to the control dir with the given name and contents. 

2456 

2457 Args: 

2458 path: The path to the file, relative to the control dir. 

2459 contents: A string to write to the file. 

2460 """ 

2461 self._named_files[path] = contents 

2462 

2463 def _del_named_file(self, path: str) -> None: 

2464 try: 

2465 del self._named_files[path] 

2466 except KeyError: 

2467 pass 

2468 

2469 def get_named_file( 

2470 self, 

2471 path: str | bytes, 

2472 basedir: str | None = None, 

2473 ) -> BytesIO | None: 

2474 """Get a file from the control dir with a specific name. 

2475 

2476 Although the filename should be interpreted as a filename relative to 

2477 the control dir in a disk-baked Repo, the object returned need not be 

2478 pointing to a file in that location. 

2479 

2480 Args: 

2481 path: The path to the file, relative to the control dir. 

2482 basedir: Optional base directory for the path 

2483 Returns: An open file object, or None if the file does not exist. 

2484 """ 

2485 path_str = path.decode() if isinstance(path, bytes) else path 

2486 contents = self._named_files.get(path_str, None) 

2487 if contents is None: 

2488 return None 

2489 return BytesIO(contents) 

2490 

2491 def open_index(self) -> "Index": 

2492 """Fail to open index for this repo, since it is bare. 

2493 

2494 Raises: 

2495 NoIndexPresent: Raised when no index is present 

2496 """ 

2497 raise NoIndexPresent 

2498 

2499 def get_config(self) -> "ConfigFile": 

2500 """Retrieve the config object. 

2501 

2502 Returns: `ConfigFile` object. 

2503 """ 

2504 return self._config 

2505 

2506 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2507 """Get the appropriate rebase state manager for this repository. 

2508 

2509 Returns: MemoryRebaseStateManager instance 

2510 """ 

2511 from .rebase import MemoryRebaseStateManager 

2512 

2513 return MemoryRebaseStateManager(self) 

2514 

2515 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2516 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2517 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2518 

2519 # Get fresh configuration and GitAttributes 

2520 config_stack = self.get_config_stack() 

2521 git_attributes = self.get_gitattributes() 

2522 

2523 # Lazily create FilterContext if needed 

2524 if self.filter_context is None: 

2525 filter_registry = FilterRegistry(config_stack, self) 

2526 self.filter_context = FilterContext(filter_registry) 

2527 else: 

2528 # Refresh the context with current config to handle config changes 

2529 self.filter_context.refresh_config(config_stack) 

2530 

2531 # Return a new FilterBlobNormalizer with the context 

2532 return FilterBlobNormalizer( 

2533 config_stack, git_attributes, filter_context=self.filter_context 

2534 ) 

2535 

2536 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2537 """Read gitattributes for the repository.""" 

2538 from .attrs import GitAttributes 

2539 

2540 # Memory repos don't have working trees or gitattributes files 

2541 # Return empty GitAttributes 

2542 return GitAttributes([]) 

2543 

2544 def close(self) -> None: 

2545 """Close any resources opened by this repository.""" 

2546 # Clean up filter context if it was created 

2547 if self.filter_context is not None: 

2548 self.filter_context.close() 

2549 self.filter_context = None 

2550 

2551 def do_commit( 

2552 self, 

2553 message: bytes | None = None, 

2554 committer: bytes | None = None, 

2555 author: bytes | None = None, 

2556 commit_timestamp: float | None = None, 

2557 commit_timezone: int | None = None, 

2558 author_timestamp: float | None = None, 

2559 author_timezone: int | None = None, 

2560 tree: ObjectID | None = None, 

2561 encoding: bytes | None = None, 

2562 ref: Ref | None = b"HEAD", 

2563 merge_heads: list[ObjectID] | None = None, 

2564 no_verify: bool = False, 

2565 sign: bool = False, 

2566 ) -> bytes: 

2567 """Create a new commit. 

2568 

2569 This is a simplified implementation for in-memory repositories that 

2570 doesn't support worktree operations or hooks. 

2571 

2572 Args: 

2573 message: Commit message 

2574 committer: Committer fullname 

2575 author: Author fullname 

2576 commit_timestamp: Commit timestamp (defaults to now) 

2577 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2578 author_timestamp: Author timestamp (defaults to commit timestamp) 

2579 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2580 tree: SHA1 of the tree root to use 

2581 encoding: Encoding 

2582 ref: Optional ref to commit to (defaults to current branch). 

2583 If None, creates a dangling commit without updating any ref. 

2584 merge_heads: Merge heads 

2585 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2586 sign: GPG Sign the commit (ignored for MemoryRepo) 

2587 

2588 Returns: 

2589 New commit SHA1 

2590 """ 

2591 import time 

2592 

2593 from .objects import Commit 

2594 

2595 if tree is None: 

2596 raise ValueError("tree must be specified for MemoryRepo") 

2597 

2598 c = Commit() 

2599 if len(tree) != 40: 

2600 raise ValueError("tree must be a 40-byte hex sha string") 

2601 c.tree = tree 

2602 

2603 config = self.get_config_stack() 

2604 if merge_heads is None: 

2605 merge_heads = [] 

2606 if committer is None: 

2607 committer = get_user_identity(config, kind="COMMITTER") 

2608 check_user_identity(committer) 

2609 c.committer = committer 

2610 if commit_timestamp is None: 

2611 commit_timestamp = time.time() 

2612 c.commit_time = int(commit_timestamp) 

2613 if commit_timezone is None: 

2614 commit_timezone = 0 

2615 c.commit_timezone = commit_timezone 

2616 if author is None: 

2617 author = get_user_identity(config, kind="AUTHOR") 

2618 c.author = author 

2619 check_user_identity(author) 

2620 if author_timestamp is None: 

2621 author_timestamp = commit_timestamp 

2622 c.author_time = int(author_timestamp) 

2623 if author_timezone is None: 

2624 author_timezone = commit_timezone 

2625 c.author_timezone = author_timezone 

2626 if encoding is None: 

2627 try: 

2628 encoding = config.get(("i18n",), "commitEncoding") 

2629 except KeyError: 

2630 pass 

2631 if encoding is not None: 

2632 c.encoding = encoding 

2633 

2634 # Handle message (for MemoryRepo, we don't support callable messages) 

2635 if callable(message): 

2636 message = message(self, c) 

2637 if message is None: 

2638 raise ValueError("Message callback returned None") 

2639 

2640 if message is None: 

2641 raise ValueError("No commit message specified") 

2642 

2643 c.message = message 

2644 

2645 if ref is None: 

2646 # Create a dangling commit 

2647 c.parents = merge_heads 

2648 self.object_store.add_object(c) 

2649 else: 

2650 try: 

2651 old_head = self.refs[ref] 

2652 c.parents = [old_head, *merge_heads] 

2653 self.object_store.add_object(c) 

2654 ok = self.refs.set_if_equals( 

2655 ref, 

2656 old_head, 

2657 c.id, 

2658 message=b"commit: " + message, 

2659 committer=committer, 

2660 timestamp=int(commit_timestamp), 

2661 timezone=commit_timezone, 

2662 ) 

2663 except KeyError: 

2664 c.parents = merge_heads 

2665 self.object_store.add_object(c) 

2666 ok = self.refs.add_if_new( 

2667 ref, 

2668 c.id, 

2669 message=b"commit: " + message, 

2670 committer=committer, 

2671 timestamp=int(commit_timestamp), 

2672 timezone=commit_timezone, 

2673 ) 

2674 if not ok: 

2675 from .errors import CommitError 

2676 

2677 raise CommitError(f"{ref!r} changed during commit") 

2678 

2679 return c.id 

2680 

2681 @classmethod 

2682 def init_bare( 

2683 cls, 

2684 objects: Iterable[ShaFile], 

2685 refs: Mapping[bytes, bytes], 

2686 format: int | None = None, 

2687 ) -> "MemoryRepo": 

2688 """Create a new bare repository in memory. 

2689 

2690 Args: 

2691 objects: Objects for the new repository, 

2692 as iterable 

2693 refs: Refs as dictionary, mapping names 

2694 to object SHA1s 

2695 format: Repository format version (defaults to 0) 

2696 """ 

2697 ret = cls() 

2698 for obj in objects: 

2699 ret.object_store.add_object(obj) 

2700 for refname, sha in refs.items(): 

2701 ret.refs.add_if_new(refname, sha) 

2702 ret._init_files(bare=True, format=format) 

2703 return ret