Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1084 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32__all__ = [ 

33 "BASE_DIRECTORIES", 

34 "COMMONDIR", 

35 "CONTROLDIR", 

36 "DEFAULT_BRANCH", 

37 "DEFAULT_OFS_DELTA", 

38 "GITDIR", 

39 "INDEX_FILENAME", 

40 "OBJECTDIR", 

41 "REFSDIR", 

42 "REFSDIR_HEADS", 

43 "REFSDIR_TAGS", 

44 "WORKTREES", 

45 "BaseRepo", 

46 "DefaultIdentityNotFound", 

47 "InvalidUserIdentity", 

48 "MemoryRepo", 

49 "ParentsProvider", 

50 "Repo", 

51 "UnsupportedExtension", 

52 "UnsupportedVersion", 

53 "check_user_identity", 

54 "get_user_identity", 

55 "parse_graftpoints", 

56 "parse_shared_repository", 

57 "read_gitfile", 

58 "serialize_graftpoints", 

59] 

60 

61import os 

62import stat 

63import sys 

64import time 

65import warnings 

66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence 

67from io import BytesIO 

68from types import TracebackType 

69from typing import ( 

70 TYPE_CHECKING, 

71 Any, 

72 BinaryIO, 

73 TypeVar, 

74) 

75 

76if TYPE_CHECKING: 

77 # There are no circular imports here, but we try to defer imports as long 

78 # as possible to reduce start-up time for anything that doesn't need 

79 # these imports. 

80 from .attrs import GitAttributes 

81 from .config import ConditionMatcher, ConfigFile, StackedConfig 

82 from .diff_tree import RenameDetector 

83 from .filters import FilterBlobNormalizer, FilterContext 

84 from .index import Index 

85 from .notes import Notes 

86 from .object_store import BaseObjectStore, GraphWalker 

87 from .pack import UnpackedObject 

88 from .rebase import RebaseStateManager 

89 from .walk import Walker 

90 from .worktree import WorkTree 

91 

92from . import reflog, replace_me 

93from .errors import ( 

94 NoIndexPresent, 

95 NotBlobError, 

96 NotCommitError, 

97 NotGitRepository, 

98 NotTagError, 

99 NotTreeError, 

100 RefFormatError, 

101) 

102from .file import GitFile 

103from .hooks import ( 

104 CommitMsgShellHook, 

105 Hook, 

106 PostCommitShellHook, 

107 PostReceiveShellHook, 

108 PreCommitShellHook, 

109) 

110from .object_store import ( 

111 DiskObjectStore, 

112 MemoryObjectStore, 

113 MissingObjectFinder, 

114 ObjectStoreGraphWalker, 

115 PackBasedObjectStore, 

116 PackCapableObjectStore, 

117 find_shallow, 

118 peel_sha, 

119) 

120from .objects import ( 

121 Blob, 

122 Commit, 

123 ObjectID, 

124 RawObjectID, 

125 ShaFile, 

126 Tag, 

127 Tree, 

128 check_hexsha, 

129 valid_hexsha, 

130) 

131from .pack import generate_unpacked_objects 

132from .refs import ( 

133 HEADREF, 

134 LOCAL_TAG_PREFIX, # noqa: F401 

135 SYMREF, # noqa: F401 

136 DictRefsContainer, 

137 DiskRefsContainer, 

138 Ref, 

139 RefsContainer, 

140 _set_default_branch, 

141 _set_head, 

142 _set_origin_head, 

143 check_ref_format, # noqa: F401 

144 extract_branch_name, 

145 is_per_worktree_ref, 

146 local_branch_name, 

147 read_packed_refs, # noqa: F401 

148 read_packed_refs_with_peeled, # noqa: F401 

149 write_packed_refs, # noqa: F401 

150) 

151 

152CONTROLDIR = ".git" 

153OBJECTDIR = "objects" 

154DEFAULT_OFS_DELTA = True 

155 

156T = TypeVar("T", bound="ShaFile") 

157REFSDIR = "refs" 

158REFSDIR_TAGS = "tags" 

159REFSDIR_HEADS = "heads" 

160INDEX_FILENAME = "index" 

161COMMONDIR = "commondir" 

162GITDIR = "gitdir" 

163WORKTREES = "worktrees" 

164 

165BASE_DIRECTORIES = [ 

166 ["branches"], 

167 [REFSDIR], 

168 [REFSDIR, REFSDIR_TAGS], 

169 [REFSDIR, REFSDIR_HEADS], 

170 ["hooks"], 

171 ["info"], 

172] 

173 

174DEFAULT_BRANCH = b"master" 

175 

176 

177class InvalidUserIdentity(Exception): 

178 """User identity is not of the format 'user <email>'.""" 

179 

180 def __init__(self, identity: str) -> None: 

181 """Initialize InvalidUserIdentity exception.""" 

182 self.identity = identity 

183 

184 

185class DefaultIdentityNotFound(Exception): 

186 """Default identity could not be determined.""" 

187 

188 

189# TODO(jelmer): Cache? 

190def _get_default_identity() -> tuple[str, str]: 

191 import socket 

192 

193 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

194 username = os.environ.get(name) 

195 if username: 

196 break 

197 else: 

198 username = None 

199 

200 try: 

201 import pwd 

202 except ImportError: 

203 fullname = None 

204 else: 

205 try: 

206 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

207 except KeyError: 

208 fullname = None 

209 else: 

210 if getattr(entry, "gecos", None): 

211 fullname = entry.pw_gecos.split(",")[0] 

212 else: 

213 fullname = None 

214 if username is None: 

215 username = entry.pw_name 

216 if not fullname: 

217 if username is None: 

218 raise DefaultIdentityNotFound("no username found") 

219 fullname = username 

220 email = os.environ.get("EMAIL") 

221 if email is None: 

222 if username is None: 

223 raise DefaultIdentityNotFound("no username found") 

224 email = f"{username}@{socket.gethostname()}" 

225 return (fullname, email) 

226 

227 

228def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes: 

229 """Determine the identity to use for new commits. 

230 

231 If kind is set, this first checks 

232 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

233 

234 If those variables are not set, then it will fall back 

235 to reading the user.name and user.email settings from 

236 the specified configuration. 

237 

238 If that also fails, then it will fall back to using 

239 the current users' identity as obtained from the host 

240 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

241 

242 Args: 

243 config: Configuration stack to read from 

244 kind: Optional kind to return identity for, 

245 usually either "AUTHOR" or "COMMITTER". 

246 

247 Returns: 

248 A user identity 

249 """ 

250 user: bytes | None = None 

251 email: bytes | None = None 

252 if kind: 

253 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

254 if user_uc is not None: 

255 user = user_uc.encode("utf-8") 

256 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

257 if email_uc is not None: 

258 email = email_uc.encode("utf-8") 

259 if user is None: 

260 try: 

261 user = config.get(("user",), "name") 

262 except KeyError: 

263 user = None 

264 if email is None: 

265 try: 

266 email = config.get(("user",), "email") 

267 except KeyError: 

268 email = None 

269 default_user, default_email = _get_default_identity() 

270 if user is None: 

271 user = default_user.encode("utf-8") 

272 if email is None: 

273 email = default_email.encode("utf-8") 

274 if email.startswith(b"<") and email.endswith(b">"): 

275 email = email[1:-1] 

276 return user + b" <" + email + b">" 

277 

278 

279def check_user_identity(identity: bytes) -> None: 

280 """Verify that a user identity is formatted correctly. 

281 

282 Args: 

283 identity: User identity bytestring 

284 Raises: 

285 InvalidUserIdentity: Raised when identity is invalid 

286 """ 

287 try: 

288 _fst, snd = identity.split(b" <", 1) 

289 except ValueError as exc: 

290 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

291 if b">" not in snd: 

292 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

293 if b"\0" in identity or b"\n" in identity: 

294 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

295 

296 

297def parse_graftpoints( 

298 graftpoints: Iterable[bytes], 

299) -> dict[ObjectID, list[ObjectID]]: 

300 """Convert a list of graftpoints into a dict. 

301 

302 Args: 

303 graftpoints: Iterator of graftpoint lines 

304 

305 Each line is formatted as: 

306 <commit sha1> <parent sha1> [<parent sha1>]* 

307 

308 Resulting dictionary is: 

309 <commit sha1>: [<parent sha1>*] 

310 

311 https://git.wiki.kernel.org/index.php/GraftPoint 

312 """ 

313 grafts: dict[ObjectID, list[ObjectID]] = {} 

314 for line in graftpoints: 

315 raw_graft = line.split(None, 1) 

316 

317 commit = ObjectID(raw_graft[0]) 

318 if len(raw_graft) == 2: 

319 parents = [ObjectID(p) for p in raw_graft[1].split()] 

320 else: 

321 parents = [] 

322 

323 for sha in [commit, *parents]: 

324 check_hexsha(sha, "Invalid graftpoint") 

325 

326 grafts[commit] = parents 

327 return grafts 

328 

329 

330def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes: 

331 """Convert a dictionary of grafts into string. 

332 

333 The graft dictionary is: 

334 <commit sha1>: [<parent sha1>*] 

335 

336 Each line is formatted as: 

337 <commit sha1> <parent sha1> [<parent sha1>]* 

338 

339 https://git.wiki.kernel.org/index.php/GraftPoint 

340 

341 """ 

342 graft_lines = [] 

343 for commit, parents in graftpoints.items(): 

344 if parents: 

345 graft_lines.append(commit + b" " + b" ".join(parents)) 

346 else: 

347 graft_lines.append(commit) 

348 return b"\n".join(graft_lines) 

349 

350 

351def _set_filesystem_hidden(path: str) -> None: 

352 """Mark path as to be hidden if supported by platform and filesystem. 

353 

354 On win32 uses SetFileAttributesW api: 

355 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

356 """ 

357 if sys.platform == "win32": 

358 import ctypes 

359 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

360 

361 FILE_ATTRIBUTE_HIDDEN = 2 

362 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

363 ("SetFileAttributesW", ctypes.windll.kernel32) 

364 ) 

365 

366 if isinstance(path, bytes): 

367 path = os.fsdecode(path) 

368 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

369 pass # Could raise or log `ctypes.WinError()` here 

370 

371 # Could implement other platform specific filesystem hiding here 

372 

373 

374def parse_shared_repository( 

375 value: str | bytes | bool, 

376) -> tuple[int | None, int | None]: 

377 """Parse core.sharedRepository configuration value. 

378 

379 Args: 

380 value: Configuration value (string, bytes, or boolean) 

381 

382 Returns: 

383 tuple of (file_mask, directory_mask) or (None, None) if not shared 

384 

385 The masks are permission bits to apply via chmod. 

386 """ 

387 if isinstance(value, bytes): 

388 value = value.decode("utf-8", errors="replace") 

389 

390 # Handle boolean values 

391 if isinstance(value, bool): 

392 if value: 

393 # true = group (same as "group") 

394 return (0o664, 0o2775) 

395 else: 

396 # false = umask (use system umask, no adjustment) 

397 return (None, None) 

398 

399 # Handle string values 

400 value_lower = value.lower() 

401 

402 if value_lower in ("false", "0", ""): 

403 # Use umask (no adjustment) 

404 return (None, None) 

405 

406 if value_lower in ("true", "1", "group"): 

407 # Group writable (with setgid bit) 

408 return (0o664, 0o2775) 

409 

410 if value_lower in ("all", "world", "everybody", "2"): 

411 # World readable/writable (with setgid bit) 

412 return (0o666, 0o2777) 

413 

414 if value_lower == "umask": 

415 # Explicitly use umask 

416 return (None, None) 

417 

418 # Try to parse as octal 

419 if value.startswith("0"): 

420 try: 

421 mode = int(value, 8) 

422 # For directories, add execute bits where read bits are set 

423 # and add setgid bit for shared repositories 

424 dir_mode = mode | 0o2000 # Add setgid bit 

425 if mode & 0o004: 

426 dir_mode |= 0o001 

427 if mode & 0o040: 

428 dir_mode |= 0o010 

429 if mode & 0o400: 

430 dir_mode |= 0o100 

431 return (mode, dir_mode) 

432 except ValueError: 

433 pass 

434 

435 # Default to umask for unrecognized values 

436 return (None, None) 

437 

438 

439class ParentsProvider: 

440 """Provider for commit parent information.""" 

441 

442 def __init__( 

443 self, 

444 store: "BaseObjectStore", 

445 grafts: dict[ObjectID, list[ObjectID]] = {}, 

446 shallows: Iterable[ObjectID] = [], 

447 ) -> None: 

448 """Initialize ParentsProvider. 

449 

450 Args: 

451 store: Object store to use 

452 grafts: Graft information 

453 shallows: Shallow commit SHAs 

454 """ 

455 self.store = store 

456 self.grafts = grafts 

457 self.shallows = set(shallows) 

458 

459 # Get commit graph once at initialization for performance 

460 self.commit_graph = store.get_commit_graph() 

461 

462 def get_parents( 

463 self, commit_id: ObjectID, commit: Commit | None = None 

464 ) -> list[ObjectID]: 

465 """Get parents for a commit using the parents provider.""" 

466 try: 

467 return self.grafts[commit_id] 

468 except KeyError: 

469 pass 

470 if commit_id in self.shallows: 

471 return [] 

472 

473 # Try to use commit graph for faster parent lookup 

474 if self.commit_graph: 

475 parents = self.commit_graph.get_parents(commit_id) 

476 if parents is not None: 

477 return parents 

478 

479 # Fallback to reading the commit object 

480 if commit is None: 

481 obj = self.store[commit_id] 

482 assert isinstance(obj, Commit) 

483 commit = obj 

484 result: list[ObjectID] = commit.parents 

485 return result 

486 

487 

488class BaseRepo: 

489 """Base class for a git repository. 

490 

491 This base class is meant to be used for Repository implementations that e.g. 

492 work on top of a different transport than a standard filesystem path. 

493 

494 Attributes: 

495 object_store: Dictionary-like object for accessing 

496 the objects 

497 refs: Dictionary-like object with the refs in this 

498 repository 

499 """ 

500 

501 def __init__( 

502 self, object_store: "PackCapableObjectStore", refs: RefsContainer 

503 ) -> None: 

504 """Open a repository. 

505 

506 This shouldn't be called directly, but rather through one of the 

507 base classes, such as MemoryRepo or Repo. 

508 

509 Args: 

510 object_store: Object store to use 

511 refs: Refs container to use 

512 """ 

513 self.object_store = object_store 

514 self.refs = refs 

515 

516 self._graftpoints: dict[ObjectID, list[ObjectID]] = {} 

517 self.hooks: dict[str, Hook] = {} 

518 

519 def _determine_file_mode(self) -> bool: 

520 """Probe the file-system to determine whether permissions can be trusted. 

521 

522 Returns: True if permissions can be trusted, False otherwise. 

523 """ 

524 raise NotImplementedError(self._determine_file_mode) 

525 

526 def _determine_symlinks(self) -> bool: 

527 """Probe the filesystem to determine whether symlinks can be created. 

528 

529 Returns: True if symlinks can be created, False otherwise. 

530 """ 

531 # For now, just mimic the old behaviour 

532 return sys.platform != "win32" 

533 

534 def _init_files( 

535 self, 

536 bare: bool, 

537 symlinks: bool | None = None, 

538 format: int | None = None, 

539 shared_repository: str | bool | None = None, 

540 ) -> None: 

541 """Initialize a default set of named files.""" 

542 from .config import ConfigFile 

543 

544 self._put_named_file("description", b"Unnamed repository") 

545 f = BytesIO() 

546 cf = ConfigFile() 

547 if format is None: 

548 format = 0 

549 if format not in (0, 1): 

550 raise ValueError(f"Unsupported repository format version: {format}") 

551 cf.set("core", "repositoryformatversion", str(format)) 

552 if self._determine_file_mode(): 

553 cf.set("core", "filemode", True) 

554 else: 

555 cf.set("core", "filemode", False) 

556 

557 if symlinks is None and not bare: 

558 symlinks = self._determine_symlinks() 

559 

560 if symlinks is False: 

561 cf.set("core", "symlinks", symlinks) 

562 

563 cf.set("core", "bare", bare) 

564 cf.set("core", "logallrefupdates", True) 

565 

566 # Set shared repository if specified 

567 if shared_repository is not None: 

568 if isinstance(shared_repository, bool): 

569 cf.set("core", "sharedRepository", shared_repository) 

570 else: 

571 cf.set("core", "sharedRepository", shared_repository) 

572 

573 cf.write_to_file(f) 

574 self._put_named_file("config", f.getvalue()) 

575 self._put_named_file(os.path.join("info", "exclude"), b"") 

576 

577 def get_named_file(self, path: str) -> BinaryIO | None: 

578 """Get a file from the control dir with a specific name. 

579 

580 Although the filename should be interpreted as a filename relative to 

581 the control dir in a disk-based Repo, the object returned need not be 

582 pointing to a file in that location. 

583 

584 Args: 

585 path: The path to the file, relative to the control dir. 

586 Returns: An open file object, or None if the file does not exist. 

587 """ 

588 raise NotImplementedError(self.get_named_file) 

589 

590 def _put_named_file(self, path: str, contents: bytes) -> None: 

591 """Write a file to the control dir with the given name and contents. 

592 

593 Args: 

594 path: The path to the file, relative to the control dir. 

595 contents: A string to write to the file. 

596 """ 

597 raise NotImplementedError(self._put_named_file) 

598 

599 def _del_named_file(self, path: str) -> None: 

600 """Delete a file in the control directory with the given name.""" 

601 raise NotImplementedError(self._del_named_file) 

602 

603 def open_index(self) -> "Index": 

604 """Open the index for this repository. 

605 

606 Raises: 

607 NoIndexPresent: If no index is present 

608 Returns: The matching `Index` 

609 """ 

610 raise NotImplementedError(self.open_index) 

611 

612 def fetch( 

613 self, 

614 target: "BaseRepo", 

615 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]] 

616 | None = None, 

617 progress: Callable[..., None] | None = None, 

618 depth: int | None = None, 

619 ) -> dict[Ref, ObjectID]: 

620 """Fetch objects into another repository. 

621 

622 Args: 

623 target: The target repository 

624 determine_wants: Optional function to determine what refs to 

625 fetch. 

626 progress: Optional progress function 

627 depth: Optional shallow fetch depth 

628 Returns: The local refs 

629 """ 

630 if determine_wants is None: 

631 determine_wants = target.object_store.determine_wants_all 

632 count, pack_data = self.fetch_pack_data( 

633 determine_wants, 

634 target.get_graph_walker(), 

635 progress=progress, 

636 depth=depth, 

637 ) 

638 target.object_store.add_pack_data(count, pack_data, progress) 

639 return self.get_refs() 

640 

641 def fetch_pack_data( 

642 self, 

643 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]], 

644 graph_walker: "GraphWalker", 

645 progress: Callable[[bytes], None] | None, 

646 *, 

647 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None, 

648 depth: int | None = None, 

649 ) -> tuple[int, Iterator["UnpackedObject"]]: 

650 """Fetch the pack data required for a set of revisions. 

651 

652 Args: 

653 determine_wants: Function that takes a dictionary with heads 

654 and returns the list of heads to fetch. 

655 graph_walker: Object that can iterate over the list of revisions 

656 to fetch and has an "ack" method that will be called to acknowledge 

657 that a revision is present. 

658 progress: Simple progress function that will be called with 

659 updated progress strings. 

660 get_tagged: Function that returns a dict of pointed-to sha -> 

661 tag sha for including tags. 

662 depth: Shallow fetch depth 

663 Returns: count and iterator over pack data 

664 """ 

665 missing_objects = self.find_missing_objects( 

666 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

667 ) 

668 if missing_objects is None: 

669 return 0, iter([]) 

670 remote_has = missing_objects.get_remote_has() 

671 object_ids = list(missing_objects) 

672 return len(object_ids), generate_unpacked_objects( 

673 self.object_store, object_ids, progress=progress, other_haves=remote_has 

674 ) 

675 

676 def find_missing_objects( 

677 self, 

678 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]], 

679 graph_walker: "GraphWalker", 

680 progress: Callable[[bytes], None] | None, 

681 *, 

682 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None, 

683 depth: int | None = None, 

684 ) -> MissingObjectFinder | None: 

685 """Fetch the missing objects required for a set of revisions. 

686 

687 Args: 

688 determine_wants: Function that takes a dictionary with heads 

689 and returns the list of heads to fetch. 

690 graph_walker: Object that can iterate over the list of revisions 

691 to fetch and has an "ack" method that will be called to acknowledge 

692 that a revision is present. 

693 progress: Simple progress function that will be called with 

694 updated progress strings. 

695 get_tagged: Function that returns a dict of pointed-to sha -> 

696 tag sha for including tags. 

697 depth: Shallow fetch depth 

698 Returns: iterator over objects, with __len__ implemented 

699 """ 

700 import logging 

701 

702 # Filter out refs pointing to missing objects to avoid errors downstream. 

703 # This makes Dulwich more robust when dealing with broken refs on disk. 

704 # Previously serialize_refs() did this filtering as a side-effect. 

705 all_refs = self.get_refs() 

706 refs: dict[Ref, ObjectID] = {} 

707 for ref, sha in all_refs.items(): 

708 if sha in self.object_store: 

709 refs[ref] = sha 

710 else: 

711 logging.warning( 

712 "ref %s points at non-present sha %s", 

713 ref.decode("utf-8", "replace"), 

714 sha.decode("ascii"), 

715 ) 

716 

717 wants = determine_wants(refs, depth) 

718 if not isinstance(wants, list): 

719 raise TypeError("determine_wants() did not return a list") 

720 

721 current_shallow = set(getattr(graph_walker, "shallow", set())) 

722 

723 if depth not in (None, 0): 

724 assert depth is not None 

725 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

726 # Only update if graph_walker has shallow attribute 

727 if hasattr(graph_walker, "shallow"): 

728 graph_walker.shallow.update(shallow - not_shallow) 

729 new_shallow = graph_walker.shallow - current_shallow 

730 unshallow = not_shallow & current_shallow 

731 setattr(graph_walker, "unshallow", unshallow) 

732 if hasattr(graph_walker, "update_shallow"): 

733 graph_walker.update_shallow(new_shallow, unshallow) 

734 else: 

735 unshallow = getattr(graph_walker, "unshallow", set()) 

736 

737 if wants == []: 

738 # TODO(dborowitz): find a way to short-circuit that doesn't change 

739 # this interface. 

740 

741 if getattr(graph_walker, "shallow", set()) or unshallow: 

742 # Do not send a pack in shallow short-circuit path 

743 return None 

744 

745 # Return an actual MissingObjectFinder with empty wants 

746 return MissingObjectFinder( 

747 self.object_store, 

748 haves=[], 

749 wants=[], 

750 ) 

751 

752 # If the graph walker is set up with an implementation that can 

753 # ACK/NAK to the wire, it will write data to the client through 

754 # this call as a side-effect. 

755 haves = self.object_store.find_common_revisions(graph_walker) 

756 

757 # Deal with shallow requests separately because the haves do 

758 # not reflect what objects are missing 

759 if getattr(graph_walker, "shallow", set()) or unshallow: 

760 # TODO: filter the haves commits from iter_shas. the specific 

761 # commits aren't missing. 

762 haves = [] 

763 

764 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

765 

766 def get_parents(commit: Commit) -> list[ObjectID]: 

767 """Get parents for a commit using the parents provider. 

768 

769 Args: 

770 commit: Commit object 

771 

772 Returns: 

773 List of parent commit SHAs 

774 """ 

775 return parents_provider.get_parents(commit.id, commit) 

776 

777 return MissingObjectFinder( 

778 self.object_store, 

779 haves=haves, 

780 wants=wants, 

781 shallow=getattr(graph_walker, "shallow", set()), 

782 progress=progress, 

783 get_tagged=get_tagged, 

784 get_parents=get_parents, 

785 ) 

786 

787 def generate_pack_data( 

788 self, 

789 have: set[ObjectID], 

790 want: set[ObjectID], 

791 *, 

792 shallow: set[ObjectID] | None = None, 

793 progress: Callable[[str], None] | None = None, 

794 ofs_delta: bool | None = None, 

795 ) -> tuple[int, Iterator["UnpackedObject"]]: 

796 """Generate pack data objects for a set of wants/haves. 

797 

798 Args: 

799 have: List of SHA1s of objects that should not be sent 

800 want: List of SHA1s of objects that should be sent 

801 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits) 

802 ofs_delta: Whether OFS deltas can be included 

803 progress: Optional progress reporting method 

804 """ 

805 if shallow is None: 

806 shallow = self.get_shallow() 

807 return self.object_store.generate_pack_data( 

808 have, 

809 want, 

810 shallow=shallow, 

811 progress=progress, 

812 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

813 ) 

814 

815 def get_graph_walker( 

816 self, heads: list[ObjectID] | None = None 

817 ) -> ObjectStoreGraphWalker: 

818 """Retrieve a graph walker. 

819 

820 A graph walker is used by a remote repository (or proxy) 

821 to find out which objects are present in this repository. 

822 

823 Args: 

824 heads: Repository heads to use (optional) 

825 Returns: A graph walker object 

826 """ 

827 if heads is None: 

828 heads = [ 

829 sha 

830 for sha in self.refs.as_dict(Ref(b"refs/heads")).values() 

831 if sha in self.object_store 

832 ] 

833 parents_provider = ParentsProvider(self.object_store) 

834 return ObjectStoreGraphWalker( 

835 heads, 

836 parents_provider.get_parents, 

837 shallow=self.get_shallow(), 

838 update_shallow=self.update_shallow, 

839 ) 

840 

841 def get_refs(self) -> dict[Ref, ObjectID]: 

842 """Get dictionary with all refs. 

843 

844 Returns: A ``dict`` mapping ref names to SHA1s 

845 """ 

846 return self.refs.as_dict() 

847 

848 def head(self) -> ObjectID: 

849 """Return the SHA1 pointed at by HEAD.""" 

850 # TODO: move this method to WorkTree 

851 return self.refs[HEADREF] 

852 

853 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

854 assert len(sha) in (20, 40) 

855 obj_id = ObjectID(sha) if len(sha) == 40 else RawObjectID(sha) 

856 ret = self.get_object(obj_id) 

857 if not isinstance(ret, cls): 

858 if cls is Commit: 

859 raise NotCommitError(ret.id) 

860 elif cls is Blob: 

861 raise NotBlobError(ret.id) 

862 elif cls is Tree: 

863 raise NotTreeError(ret.id) 

864 elif cls is Tag: 

865 raise NotTagError(ret.id) 

866 else: 

867 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

868 return ret 

869 

870 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile: 

871 """Retrieve the object with the specified SHA. 

872 

873 Args: 

874 sha: SHA to retrieve 

875 Returns: A ShaFile object 

876 Raises: 

877 KeyError: when the object can not be found 

878 """ 

879 return self.object_store[sha] 

880 

881 def parents_provider(self) -> ParentsProvider: 

882 """Get a parents provider for this repository. 

883 

884 Returns: 

885 ParentsProvider instance configured with grafts and shallows 

886 """ 

887 return ParentsProvider( 

888 self.object_store, 

889 grafts=self._graftpoints, 

890 shallows=self.get_shallow(), 

891 ) 

892 

893 def get_parents( 

894 self, sha: ObjectID, commit: Commit | None = None 

895 ) -> list[ObjectID]: 

896 """Retrieve the parents of a specific commit. 

897 

898 If the specific commit is a graftpoint, the graft parents 

899 will be returned instead. 

900 

901 Args: 

902 sha: SHA of the commit for which to retrieve the parents 

903 commit: Optional commit matching the sha 

904 Returns: List of parents 

905 """ 

906 return self.parents_provider().get_parents(sha, commit) 

907 

908 def get_config(self) -> "ConfigFile": 

909 """Retrieve the config object. 

910 

911 Returns: `ConfigFile` object for the ``.git/config`` file. 

912 """ 

913 raise NotImplementedError(self.get_config) 

914 

915 def get_worktree_config(self) -> "ConfigFile": 

916 """Retrieve the worktree config object.""" 

917 raise NotImplementedError(self.get_worktree_config) 

918 

919 def get_description(self) -> bytes | None: 

920 """Retrieve the description for this repository. 

921 

922 Returns: Bytes with the description of the repository 

923 as set by the user. 

924 """ 

925 raise NotImplementedError(self.get_description) 

926 

927 def set_description(self, description: bytes) -> None: 

928 """Set the description for this repository. 

929 

930 Args: 

931 description: Text to set as description for this repository. 

932 """ 

933 raise NotImplementedError(self.set_description) 

934 

935 def get_rebase_state_manager(self) -> "RebaseStateManager": 

936 """Get the appropriate rebase state manager for this repository. 

937 

938 Returns: RebaseStateManager instance 

939 """ 

940 raise NotImplementedError(self.get_rebase_state_manager) 

941 

942 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

943 """Return a BlobNormalizer object for checkin/checkout operations. 

944 

945 Returns: BlobNormalizer instance 

946 """ 

947 raise NotImplementedError(self.get_blob_normalizer) 

948 

949 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

950 """Read gitattributes for the repository. 

951 

952 Args: 

953 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

954 

955 Returns: 

956 GitAttributes object that can be used to match paths 

957 """ 

958 raise NotImplementedError(self.get_gitattributes) 

959 

960 def get_config_stack(self) -> "StackedConfig": 

961 """Return a config stack for this repository. 

962 

963 This stack accesses the configuration for both this repository 

964 itself (.git/config) and the global configuration, which usually 

965 lives in ~/.gitconfig. 

966 

967 Returns: `Config` instance for this repository 

968 """ 

969 from .config import ConfigFile, StackedConfig 

970 

971 local_config = self.get_config() 

972 backends: list[ConfigFile] = [local_config] 

973 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

974 backends.append(self.get_worktree_config()) 

975 

976 backends += StackedConfig.default_backends() 

977 return StackedConfig(backends, writable=local_config) 

978 

979 def get_shallow(self) -> set[ObjectID]: 

980 """Get the set of shallow commits. 

981 

982 Returns: Set of shallow commits. 

983 """ 

984 f = self.get_named_file("shallow") 

985 if f is None: 

986 return set() 

987 with f: 

988 return {ObjectID(line.strip()) for line in f} 

989 

990 def update_shallow( 

991 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None 

992 ) -> None: 

993 """Update the list of shallow objects. 

994 

995 Args: 

996 new_shallow: Newly shallow objects 

997 new_unshallow: Newly no longer shallow objects 

998 """ 

999 shallow = self.get_shallow() 

1000 if new_shallow: 

1001 shallow.update(new_shallow) 

1002 if new_unshallow: 

1003 shallow.difference_update(new_unshallow) 

1004 if shallow: 

1005 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

1006 else: 

1007 self._del_named_file("shallow") 

1008 

1009 def get_peeled(self, ref: Ref) -> ObjectID: 

1010 """Get the peeled value of a ref. 

1011 

1012 Args: 

1013 ref: The refname to peel. 

1014 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

1015 intermediate tags; if the original ref does not point to a tag, 

1016 this will equal the original SHA1. 

1017 """ 

1018 cached = self.refs.get_peeled(ref) 

1019 if cached is not None: 

1020 return cached 

1021 return peel_sha(self.object_store, self.refs[ref])[1].id 

1022 

1023 @property 

1024 def notes(self) -> "Notes": 

1025 """Access notes functionality for this repository. 

1026 

1027 Returns: 

1028 Notes object for accessing notes 

1029 """ 

1030 from .notes import Notes 

1031 

1032 return Notes(self.object_store, self.refs) 

1033 

1034 def get_walker( 

1035 self, 

1036 include: Sequence[ObjectID] | None = None, 

1037 exclude: Sequence[ObjectID] | None = None, 

1038 order: str = "date", 

1039 reverse: bool = False, 

1040 max_entries: int | None = None, 

1041 paths: Sequence[bytes] | None = None, 

1042 rename_detector: "RenameDetector | None" = None, 

1043 follow: bool = False, 

1044 since: int | None = None, 

1045 until: int | None = None, 

1046 queue_cls: type | None = None, 

1047 ) -> "Walker": 

1048 """Obtain a walker for this repository. 

1049 

1050 Args: 

1051 include: Iterable of SHAs of commits to include along with their 

1052 ancestors. Defaults to [HEAD] 

1053 exclude: Iterable of SHAs of commits to exclude along with their 

1054 ancestors, overriding includes. 

1055 order: ORDER_* constant specifying the order of results. 

1056 Anything other than ORDER_DATE may result in O(n) memory usage. 

1057 reverse: If True, reverse the order of output, requiring O(n) 

1058 memory. 

1059 max_entries: The maximum number of entries to yield, or None for 

1060 no limit. 

1061 paths: Iterable of file or subtree paths to show entries for. 

1062 rename_detector: diff.RenameDetector object for detecting 

1063 renames. 

1064 follow: If True, follow path across renames/copies. Forces a 

1065 default rename_detector. 

1066 since: Timestamp to list commits after. 

1067 until: Timestamp to list commits before. 

1068 queue_cls: A class to use for a queue of commits, supporting the 

1069 iterator protocol. The constructor takes a single argument, the Walker. 

1070 

1071 Returns: A `Walker` object 

1072 """ 

1073 from .walk import Walker, _CommitTimeQueue 

1074 

1075 if include is None: 

1076 include = [self.head()] 

1077 

1078 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

1079 return Walker( 

1080 self.object_store, 

1081 include, 

1082 exclude=exclude, 

1083 order=order, 

1084 reverse=reverse, 

1085 max_entries=max_entries, 

1086 paths=paths, 

1087 rename_detector=rename_detector, 

1088 follow=follow, 

1089 since=since, 

1090 until=until, 

1091 get_parents=lambda commit: self.get_parents(commit.id, commit), 

1092 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

1093 ) 

1094 

1095 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile": 

1096 """Retrieve a Git object by SHA1 or ref. 

1097 

1098 Args: 

1099 name: A Git object SHA1 or a ref name 

1100 Returns: A `ShaFile` object, such as a Commit or Blob 

1101 Raises: 

1102 KeyError: when the specified ref or object does not exist 

1103 """ 

1104 if not isinstance(name, bytes): 

1105 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

1106 if len(name) in (20, 40): 

1107 try: 

1108 # Try as ObjectID/RawObjectID 

1109 return self.object_store[ 

1110 ObjectID(name) if len(name) == 40 else RawObjectID(name) 

1111 ] 

1112 except (KeyError, ValueError): 

1113 pass 

1114 try: 

1115 return self.object_store[self.refs[Ref(name)]] 

1116 except RefFormatError as exc: 

1117 raise KeyError(name) from exc 

1118 

1119 def __contains__(self, name: bytes) -> bool: 

1120 """Check if a specific Git object or ref is present. 

1121 

1122 Args: 

1123 name: Git object SHA1 or ref name 

1124 """ 

1125 if len(name) == 20: 

1126 return RawObjectID(name) in self.object_store or Ref(name) in self.refs 

1127 elif len(name) == 40 and valid_hexsha(name): 

1128 return ObjectID(name) in self.object_store or Ref(name) in self.refs 

1129 else: 

1130 return Ref(name) in self.refs 

1131 

1132 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None: 

1133 """Set a ref. 

1134 

1135 Args: 

1136 name: ref name 

1137 value: Ref value - either a ShaFile object, or a hex sha 

1138 """ 

1139 if name.startswith(b"refs/") or name == HEADREF: 

1140 ref_name = Ref(name) 

1141 if isinstance(value, ShaFile): 

1142 self.refs[ref_name] = value.id 

1143 elif isinstance(value, bytes): 

1144 self.refs[ref_name] = ObjectID(value) 

1145 else: 

1146 raise TypeError(value) 

1147 else: 

1148 raise ValueError(name) 

1149 

1150 def __delitem__(self, name: bytes) -> None: 

1151 """Remove a ref. 

1152 

1153 Args: 

1154 name: Name of the ref to remove 

1155 """ 

1156 if name.startswith(b"refs/") or name == HEADREF: 

1157 del self.refs[Ref(name)] 

1158 else: 

1159 raise ValueError(name) 

1160 

1161 def _get_user_identity( 

1162 self, config: "StackedConfig", kind: str | None = None 

1163 ) -> bytes: 

1164 """Determine the identity to use for new commits.""" 

1165 warnings.warn( 

1166 "use get_user_identity() rather than Repo._get_user_identity", 

1167 DeprecationWarning, 

1168 ) 

1169 return get_user_identity(config) 

1170 

1171 def _add_graftpoints( 

1172 self, updated_graftpoints: dict[ObjectID, list[ObjectID]] 

1173 ) -> None: 

1174 """Add or modify graftpoints. 

1175 

1176 Args: 

1177 updated_graftpoints: Dict of commit shas to list of parent shas 

1178 """ 

1179 # Simple validation 

1180 for commit, parents in updated_graftpoints.items(): 

1181 for sha in [commit, *parents]: 

1182 check_hexsha(sha, "Invalid graftpoint") 

1183 

1184 self._graftpoints.update(updated_graftpoints) 

1185 

1186 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None: 

1187 """Remove graftpoints. 

1188 

1189 Args: 

1190 to_remove: List of commit shas 

1191 """ 

1192 for sha in to_remove: 

1193 del self._graftpoints[sha] 

1194 

1195 def _read_heads(self, name: str) -> list[ObjectID]: 

1196 f = self.get_named_file(name) 

1197 if f is None: 

1198 return [] 

1199 with f: 

1200 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()] 

1201 

1202 def get_worktree(self) -> "WorkTree": 

1203 """Get the working tree for this repository. 

1204 

1205 Returns: 

1206 WorkTree instance for performing working tree operations 

1207 

1208 Raises: 

1209 NotImplementedError: If the repository doesn't support working trees 

1210 """ 

1211 raise NotImplementedError( 

1212 "Working tree operations not supported by this repository type" 

1213 ) 

1214 

1215 @replace_me(remove_in="0.26.0") 

1216 def do_commit( 

1217 self, 

1218 message: bytes | None = None, 

1219 committer: bytes | None = None, 

1220 author: bytes | None = None, 

1221 commit_timestamp: float | None = None, 

1222 commit_timezone: int | None = None, 

1223 author_timestamp: float | None = None, 

1224 author_timezone: int | None = None, 

1225 tree: ObjectID | None = None, 

1226 encoding: bytes | None = None, 

1227 ref: Ref | None = HEADREF, 

1228 merge_heads: list[ObjectID] | None = None, 

1229 no_verify: bool = False, 

1230 sign: bool = False, 

1231 ) -> bytes: 

1232 """Create a new commit. 

1233 

1234 If not specified, committer and author default to 

1235 get_user_identity(..., 'COMMITTER') 

1236 and get_user_identity(..., 'AUTHOR') respectively. 

1237 

1238 Args: 

1239 message: Commit message (bytes or callable that takes (repo, commit) 

1240 and returns bytes) 

1241 committer: Committer fullname 

1242 author: Author fullname 

1243 commit_timestamp: Commit timestamp (defaults to now) 

1244 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1245 author_timestamp: Author timestamp (defaults to commit 

1246 timestamp) 

1247 author_timezone: Author timestamp timezone 

1248 (defaults to commit timestamp timezone) 

1249 tree: SHA1 of the tree root to use (if not specified the 

1250 current index will be committed). 

1251 encoding: Encoding 

1252 ref: Optional ref to commit to (defaults to current branch). 

1253 If None, creates a dangling commit without updating any ref. 

1254 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1255 no_verify: Skip pre-commit and commit-msg hooks 

1256 sign: GPG Sign the commit (bool, defaults to False, 

1257 pass True to use default GPG key, 

1258 pass a str containing Key ID to use a specific GPG key) 

1259 

1260 Returns: 

1261 New commit SHA1 

1262 """ 

1263 return self.get_worktree().commit( 

1264 message=message, 

1265 committer=committer, 

1266 author=author, 

1267 commit_timestamp=commit_timestamp, 

1268 commit_timezone=commit_timezone, 

1269 author_timestamp=author_timestamp, 

1270 author_timezone=author_timezone, 

1271 tree=tree, 

1272 encoding=encoding, 

1273 ref=ref, 

1274 merge_heads=merge_heads, 

1275 no_verify=no_verify, 

1276 sign=sign, 

1277 ) 

1278 

1279 

1280def read_gitfile(f: BinaryIO) -> str: 

1281 """Read a ``.git`` file. 

1282 

1283 The first line of the file should start with "gitdir: " 

1284 

1285 Args: 

1286 f: File-like object to read from 

1287 Returns: A path 

1288 """ 

1289 cs = f.read() 

1290 if not cs.startswith(b"gitdir: "): 

1291 raise ValueError("Expected file to start with 'gitdir: '") 

1292 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8") 

1293 

1294 

1295class UnsupportedVersion(Exception): 

1296 """Unsupported repository version.""" 

1297 

1298 def __init__(self, version: int) -> None: 

1299 """Initialize UnsupportedVersion exception. 

1300 

1301 Args: 

1302 version: The unsupported repository version 

1303 """ 

1304 self.version = version 

1305 

1306 

1307class UnsupportedExtension(Exception): 

1308 """Unsupported repository extension.""" 

1309 

1310 def __init__(self, extension: str) -> None: 

1311 """Initialize UnsupportedExtension exception. 

1312 

1313 Args: 

1314 extension: The unsupported repository extension 

1315 """ 

1316 self.extension = extension 

1317 

1318 

1319class Repo(BaseRepo): 

1320 """A git repository backed by local disk. 

1321 

1322 To open an existing repository, call the constructor with 

1323 the path of the repository. 

1324 

1325 To create a new repository, use the Repo.init class method. 

1326 

1327 Note that a repository object may hold on to resources such 

1328 as file handles for performance reasons; call .close() to free 

1329 up those resources. 

1330 

1331 Attributes: 

1332 path: Path to the working copy (if it exists) or repository control 

1333 directory (if the repository is bare) 

1334 bare: Whether this is a bare repository 

1335 """ 

1336 

1337 path: str 

1338 bare: bool 

1339 object_store: DiskObjectStore 

1340 filter_context: "FilterContext | None" 

1341 

1342 def __init__( 

1343 self, 

1344 root: str | bytes | os.PathLike[str], 

1345 object_store: PackBasedObjectStore | None = None, 

1346 bare: bool | None = None, 

1347 ) -> None: 

1348 """Open a repository on disk. 

1349 

1350 Args: 

1351 root: Path to the repository's root. 

1352 object_store: ObjectStore to use; if omitted, we use the 

1353 repository's default object store 

1354 bare: True if this is a bare repository. 

1355 """ 

1356 root = os.fspath(root) 

1357 if isinstance(root, bytes): 

1358 root = os.fsdecode(root) 

1359 hidden_path = os.path.join(root, CONTROLDIR) 

1360 if bare is None: 

1361 if os.path.isfile(hidden_path) or os.path.isdir( 

1362 os.path.join(hidden_path, OBJECTDIR) 

1363 ): 

1364 bare = False 

1365 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1366 os.path.join(root, REFSDIR) 

1367 ): 

1368 bare = True 

1369 else: 

1370 raise NotGitRepository( 

1371 "No git repository was found at {path}".format(**dict(path=root)) 

1372 ) 

1373 

1374 self.bare = bare 

1375 if bare is False: 

1376 if os.path.isfile(hidden_path): 

1377 with open(hidden_path, "rb") as f: 

1378 path = read_gitfile(f) 

1379 self._controldir = os.path.join(root, path) 

1380 else: 

1381 self._controldir = hidden_path 

1382 else: 

1383 self._controldir = root 

1384 commondir = self.get_named_file(COMMONDIR) 

1385 if commondir is not None: 

1386 with commondir: 

1387 self._commondir = os.path.join( 

1388 self.controldir(), 

1389 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1390 ) 

1391 else: 

1392 self._commondir = self._controldir 

1393 self.path = root 

1394 

1395 # Initialize refs early so they're available for config condition matchers 

1396 self.refs = DiskRefsContainer( 

1397 self.commondir(), self._controldir, logger=self._write_reflog 

1398 ) 

1399 

1400 # Initialize worktrees container 

1401 from .worktree import WorkTreeContainer 

1402 

1403 self.worktrees = WorkTreeContainer(self) 

1404 

1405 config = self.get_config() 

1406 try: 

1407 repository_format_version = config.get("core", "repositoryformatversion") 

1408 format_version = ( 

1409 0 

1410 if repository_format_version is None 

1411 else int(repository_format_version) 

1412 ) 

1413 except KeyError: 

1414 format_version = 0 

1415 

1416 if format_version not in (0, 1): 

1417 raise UnsupportedVersion(format_version) 

1418 

1419 # Track extensions we encounter 

1420 has_reftable_extension = False 

1421 for extension, value in config.items((b"extensions",)): 

1422 if extension.lower() == b"refstorage": 

1423 if value == b"reftable": 

1424 has_reftable_extension = True 

1425 else: 

1426 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1427 elif extension.lower() not in (b"worktreeconfig",): 

1428 raise UnsupportedExtension(extension.decode("utf-8")) 

1429 

1430 if object_store is None: 

1431 # Get shared repository permissions from config 

1432 try: 

1433 shared_value = config.get(("core",), "sharedRepository") 

1434 file_mode, dir_mode = parse_shared_repository(shared_value) 

1435 except KeyError: 

1436 file_mode, dir_mode = None, None 

1437 

1438 object_store = DiskObjectStore.from_config( 

1439 os.path.join(self.commondir(), OBJECTDIR), 

1440 config, 

1441 file_mode=file_mode, 

1442 dir_mode=dir_mode, 

1443 ) 

1444 

1445 # Use reftable if extension is configured 

1446 if has_reftable_extension: 

1447 from .reftable import ReftableRefsContainer 

1448 

1449 self.refs = ReftableRefsContainer(self.commondir()) 

1450 # Update worktrees container after refs change 

1451 self.worktrees = WorkTreeContainer(self) 

1452 BaseRepo.__init__(self, object_store, self.refs) 

1453 

1454 self._graftpoints = {} 

1455 graft_file = self.get_named_file( 

1456 os.path.join("info", "grafts"), basedir=self.commondir() 

1457 ) 

1458 if graft_file: 

1459 with graft_file: 

1460 self._graftpoints.update(parse_graftpoints(graft_file)) 

1461 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1462 if graft_file: 

1463 with graft_file: 

1464 self._graftpoints.update(parse_graftpoints(graft_file)) 

1465 

1466 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1467 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1468 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1469 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1470 

1471 # Initialize filter context as None, will be created lazily 

1472 self.filter_context = None 

1473 

1474 def get_worktree(self) -> "WorkTree": 

1475 """Get the working tree for this repository. 

1476 

1477 Returns: 

1478 WorkTree instance for performing working tree operations 

1479 """ 

1480 from .worktree import WorkTree 

1481 

1482 return WorkTree(self, self.path) 

1483 

1484 def _write_reflog( 

1485 self, 

1486 ref: bytes, 

1487 old_sha: bytes, 

1488 new_sha: bytes, 

1489 committer: bytes | None, 

1490 timestamp: int | None, 

1491 timezone: int | None, 

1492 message: bytes, 

1493 ) -> None: 

1494 from .reflog import format_reflog_line 

1495 

1496 path = self._reflog_path(ref) 

1497 

1498 # Get shared repository permissions 

1499 file_mode, dir_mode = self._get_shared_repository_permissions() 

1500 

1501 # Create directory with appropriate permissions 

1502 parent_dir = os.path.dirname(path) 

1503 # Create directory tree, setting permissions on each level if needed 

1504 parts = [] 

1505 current = parent_dir 

1506 while current and not os.path.exists(current): 

1507 parts.append(current) 

1508 current = os.path.dirname(current) 

1509 parts.reverse() 

1510 for part in parts: 

1511 os.mkdir(part) 

1512 if dir_mode is not None: 

1513 os.chmod(part, dir_mode) 

1514 if committer is None: 

1515 config = self.get_config_stack() 

1516 committer = get_user_identity(config) 

1517 check_user_identity(committer) 

1518 if timestamp is None: 

1519 timestamp = int(time.time()) 

1520 if timezone is None: 

1521 timezone = 0 # FIXME 

1522 with open(path, "ab") as f: 

1523 f.write( 

1524 format_reflog_line( 

1525 old_sha, new_sha, committer, timestamp, timezone, message 

1526 ) 

1527 + b"\n" 

1528 ) 

1529 

1530 # Set file permissions (open() respects umask, so we need chmod to set the actual mode) 

1531 # Always chmod to ensure correct permissions even if file already existed 

1532 if file_mode is not None: 

1533 os.chmod(path, file_mode) 

1534 

1535 def _reflog_path(self, ref: bytes) -> str: 

1536 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1537 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1538 

1539 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1540 return os.path.join(base, "logs", os.fsdecode(ref)) 

1541 

1542 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1543 """Read reflog entries for a reference. 

1544 

1545 Args: 

1546 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1547 

1548 Yields: 

1549 reflog.Entry objects in chronological order (oldest first) 

1550 """ 

1551 from .reflog import read_reflog 

1552 

1553 path = self._reflog_path(ref) 

1554 try: 

1555 with open(path, "rb") as f: 

1556 yield from read_reflog(f) 

1557 except FileNotFoundError: 

1558 return 

1559 

1560 @classmethod 

1561 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo": 

1562 """Iterate parent directories to discover a repository. 

1563 

1564 Return a Repo object for the first parent directory that looks like a 

1565 Git repository. 

1566 

1567 Args: 

1568 start: The directory to start discovery from (defaults to '.') 

1569 """ 

1570 path = os.path.abspath(start) 

1571 while True: 

1572 try: 

1573 return cls(path) 

1574 except NotGitRepository: 

1575 new_path, _tail = os.path.split(path) 

1576 if new_path == path: # Root reached 

1577 break 

1578 path = new_path 

1579 start_str = os.fspath(start) 

1580 if isinstance(start_str, bytes): 

1581 start_str = start_str.decode("utf-8") 

1582 raise NotGitRepository(f"No git repository was found at {start_str}") 

1583 

1584 def controldir(self) -> str: 

1585 """Return the path of the control directory.""" 

1586 return self._controldir 

1587 

1588 def commondir(self) -> str: 

1589 """Return the path of the common directory. 

1590 

1591 For a main working tree, it is identical to controldir(). 

1592 

1593 For a linked working tree, it is the control directory of the 

1594 main working tree. 

1595 """ 

1596 return self._commondir 

1597 

1598 def _determine_file_mode(self) -> bool: 

1599 """Probe the file-system to determine whether permissions can be trusted. 

1600 

1601 Returns: True if permissions can be trusted, False otherwise. 

1602 """ 

1603 fname = os.path.join(self.path, ".probe-permissions") 

1604 with open(fname, "w") as f: 

1605 f.write("") 

1606 

1607 st1 = os.lstat(fname) 

1608 try: 

1609 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1610 except PermissionError: 

1611 return False 

1612 st2 = os.lstat(fname) 

1613 

1614 os.unlink(fname) 

1615 

1616 mode_differs = st1.st_mode != st2.st_mode 

1617 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1618 

1619 return mode_differs and st2_has_exec 

1620 

1621 def _determine_symlinks(self) -> bool: 

1622 """Probe the filesystem to determine whether symlinks can be created. 

1623 

1624 Returns: True if symlinks can be created, False otherwise. 

1625 """ 

1626 # TODO(jelmer): Actually probe disk / look at filesystem 

1627 return sys.platform != "win32" 

1628 

1629 def _get_shared_repository_permissions( 

1630 self, 

1631 ) -> tuple[int | None, int | None]: 

1632 """Get shared repository file and directory permissions from config. 

1633 

1634 Returns: 

1635 tuple of (file_mask, directory_mask) or (None, None) if not shared 

1636 """ 

1637 try: 

1638 config = self.get_config() 

1639 value = config.get(("core",), "sharedRepository") 

1640 return parse_shared_repository(value) 

1641 except KeyError: 

1642 return (None, None) 

1643 

1644 def _put_named_file(self, path: str, contents: bytes) -> None: 

1645 """Write a file to the control dir with the given name and contents. 

1646 

1647 Args: 

1648 path: The path to the file, relative to the control dir. 

1649 contents: A string to write to the file. 

1650 """ 

1651 path = path.lstrip(os.path.sep) 

1652 

1653 # Get shared repository permissions 

1654 file_mode, _ = self._get_shared_repository_permissions() 

1655 

1656 # Create file with appropriate permissions 

1657 if file_mode is not None: 

1658 with GitFile( 

1659 os.path.join(self.controldir(), path), "wb", mask=file_mode 

1660 ) as f: 

1661 f.write(contents) 

1662 else: 

1663 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1664 f.write(contents) 

1665 

1666 def _del_named_file(self, path: str) -> None: 

1667 try: 

1668 os.unlink(os.path.join(self.controldir(), path)) 

1669 except FileNotFoundError: 

1670 return 

1671 

1672 def get_named_file( 

1673 self, 

1674 path: str | bytes, 

1675 basedir: str | None = None, 

1676 ) -> BinaryIO | None: 

1677 """Get a file from the control dir with a specific name. 

1678 

1679 Although the filename should be interpreted as a filename relative to 

1680 the control dir in a disk-based Repo, the object returned need not be 

1681 pointing to a file in that location. 

1682 

1683 Args: 

1684 path: The path to the file, relative to the control dir. 

1685 basedir: Optional argument that specifies an alternative to the 

1686 control dir. 

1687 Returns: An open file object, or None if the file does not exist. 

1688 """ 

1689 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1690 # the dumb web serving code. 

1691 if basedir is None: 

1692 basedir = self.controldir() 

1693 if isinstance(path, bytes): 

1694 path = path.decode("utf-8") 

1695 path = path.lstrip(os.path.sep) 

1696 try: 

1697 return open(os.path.join(basedir, path), "rb") 

1698 except FileNotFoundError: 

1699 return None 

1700 

1701 def index_path(self) -> str: 

1702 """Return path to the index file.""" 

1703 return os.path.join(self.controldir(), INDEX_FILENAME) 

1704 

1705 def open_index(self) -> "Index": 

1706 """Open the index for this repository. 

1707 

1708 Raises: 

1709 NoIndexPresent: If no index is present 

1710 Returns: The matching `Index` 

1711 """ 

1712 from .index import Index 

1713 

1714 if not self.has_index(): 

1715 raise NoIndexPresent 

1716 

1717 # Check for manyFiles feature configuration 

1718 config = self.get_config_stack() 

1719 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1720 skip_hash = False 

1721 index_version = None 

1722 

1723 if many_files: 

1724 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1725 try: 

1726 index_version_str = config.get(b"index", b"version") 

1727 index_version = int(index_version_str) 

1728 except KeyError: 

1729 index_version = 4 # Default to version 4 for manyFiles 

1730 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1731 else: 

1732 # Check for explicit index settings 

1733 try: 

1734 index_version_str = config.get(b"index", b"version") 

1735 index_version = int(index_version_str) 

1736 except KeyError: 

1737 index_version = None 

1738 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1739 

1740 # Get shared repository permissions for index file 

1741 file_mode, _ = self._get_shared_repository_permissions() 

1742 

1743 return Index( 

1744 self.index_path(), 

1745 skip_hash=skip_hash, 

1746 version=index_version, 

1747 file_mode=file_mode, 

1748 ) 

1749 

1750 def has_index(self) -> bool: 

1751 """Check if an index is present.""" 

1752 # Bare repos must never have index files; non-bare repos may have a 

1753 # missing index file, which is treated as empty. 

1754 return not self.bare 

1755 

1756 @replace_me(remove_in="0.26.0") 

1757 def stage( 

1758 self, 

1759 fs_paths: str 

1760 | bytes 

1761 | os.PathLike[str] 

1762 | Iterable[str | bytes | os.PathLike[str]], 

1763 ) -> None: 

1764 """Stage a set of paths. 

1765 

1766 Args: 

1767 fs_paths: List of paths, relative to the repository path 

1768 """ 

1769 return self.get_worktree().stage(fs_paths) 

1770 

1771 @replace_me(remove_in="0.26.0") 

1772 def unstage(self, fs_paths: Sequence[str]) -> None: 

1773 """Unstage specific file in the index. 

1774 

1775 Args: 

1776 fs_paths: a list of files to unstage, 

1777 relative to the repository path. 

1778 """ 

1779 return self.get_worktree().unstage(fs_paths) 

1780 

1781 def clone( 

1782 self, 

1783 target_path: str | bytes | os.PathLike[str], 

1784 *, 

1785 mkdir: bool = True, 

1786 bare: bool = False, 

1787 origin: bytes = b"origin", 

1788 checkout: bool | None = None, 

1789 branch: bytes | None = None, 

1790 progress: Callable[[str], None] | None = None, 

1791 depth: int | None = None, 

1792 symlinks: bool | None = None, 

1793 ) -> "Repo": 

1794 """Clone this repository. 

1795 

1796 Args: 

1797 target_path: Target path 

1798 mkdir: Create the target directory 

1799 bare: Whether to create a bare repository 

1800 checkout: Whether or not to check-out HEAD after cloning 

1801 origin: Base name for refs in target repository 

1802 cloned from this repository 

1803 branch: Optional branch or tag to be used as HEAD in the new repository 

1804 instead of this repository's HEAD. 

1805 progress: Optional progress function 

1806 depth: Depth at which to fetch 

1807 symlinks: Symlinks setting (default to autodetect) 

1808 Returns: Created repository as `Repo` 

1809 """ 

1810 encoded_path = os.fsencode(self.path) 

1811 

1812 if mkdir: 

1813 os.mkdir(target_path) 

1814 

1815 try: 

1816 if not bare: 

1817 target = Repo.init(target_path, symlinks=symlinks) 

1818 if checkout is None: 

1819 checkout = True 

1820 else: 

1821 if checkout: 

1822 raise ValueError("checkout and bare are incompatible") 

1823 target = Repo.init_bare(target_path) 

1824 

1825 try: 

1826 target_config = target.get_config() 

1827 target_config.set((b"remote", origin), b"url", encoded_path) 

1828 target_config.set( 

1829 (b"remote", origin), 

1830 b"fetch", 

1831 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1832 ) 

1833 target_config.write_to_path() 

1834 

1835 ref_message = b"clone: from " + encoded_path 

1836 self.fetch(target, depth=depth) 

1837 target.refs.import_refs( 

1838 Ref(b"refs/remotes/" + origin), 

1839 self.refs.as_dict(Ref(b"refs/heads")), 

1840 message=ref_message, 

1841 ) 

1842 target.refs.import_refs( 

1843 Ref(b"refs/tags"), 

1844 self.refs.as_dict(Ref(b"refs/tags")), 

1845 message=ref_message, 

1846 ) 

1847 

1848 head_chain, origin_sha = self.refs.follow(HEADREF) 

1849 origin_head = head_chain[-1] if head_chain else None 

1850 if origin_sha and not origin_head: 

1851 # set detached HEAD 

1852 target.refs[HEADREF] = origin_sha 

1853 else: 

1854 _set_origin_head(target.refs, origin, origin_head) 

1855 head_ref = _set_default_branch( 

1856 target.refs, origin, origin_head, branch, ref_message 

1857 ) 

1858 

1859 # Update target head 

1860 if head_ref: 

1861 head = _set_head(target.refs, head_ref, ref_message) 

1862 else: 

1863 head = None 

1864 

1865 if checkout and head is not None: 

1866 target.get_worktree().reset_index() 

1867 except BaseException: 

1868 target.close() 

1869 raise 

1870 except BaseException: 

1871 if mkdir: 

1872 import shutil 

1873 

1874 shutil.rmtree(target_path) 

1875 raise 

1876 return target 

1877 

1878 @replace_me(remove_in="0.26.0") 

1879 def reset_index(self, tree: ObjectID | None = None) -> None: 

1880 """Reset the index back to a specific tree. 

1881 

1882 Args: 

1883 tree: Tree SHA to reset to, None for current HEAD tree. 

1884 """ 

1885 return self.get_worktree().reset_index(tree) 

1886 

1887 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1888 """Get condition matchers for includeIf conditions. 

1889 

1890 Returns a dict of condition prefix to matcher function. 

1891 """ 

1892 from pathlib import Path 

1893 

1894 from .config import ConditionMatcher, match_glob_pattern 

1895 

1896 # Add gitdir matchers 

1897 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1898 """Match gitdir against a pattern. 

1899 

1900 Args: 

1901 pattern: Pattern to match against 

1902 case_sensitive: Whether to match case-sensitively 

1903 

1904 Returns: 

1905 True if gitdir matches pattern 

1906 """ 

1907 # Handle relative patterns (starting with ./) 

1908 if pattern.startswith("./"): 

1909 # Can't handle relative patterns without config directory context 

1910 return False 

1911 

1912 # Normalize repository path 

1913 try: 

1914 repo_path = str(Path(self._controldir).resolve()) 

1915 except (OSError, ValueError): 

1916 return False 

1917 

1918 # Expand ~ in pattern and normalize 

1919 pattern = os.path.expanduser(pattern) 

1920 

1921 # Normalize pattern following Git's rules 

1922 pattern = pattern.replace("\\", "/") 

1923 if not pattern.startswith(("~/", "./", "/", "**")): 

1924 # Check for Windows absolute path 

1925 if len(pattern) >= 2 and pattern[1] == ":": 

1926 pass 

1927 else: 

1928 pattern = "**/" + pattern 

1929 if pattern.endswith("/"): 

1930 pattern = pattern + "**" 

1931 

1932 # Use the existing _match_gitdir_pattern function 

1933 from .config import _match_gitdir_pattern 

1934 

1935 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1936 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1937 

1938 return _match_gitdir_pattern( 

1939 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1940 ) 

1941 

1942 # Add onbranch matcher 

1943 def match_onbranch(pattern: str) -> bool: 

1944 """Match current branch against a pattern. 

1945 

1946 Args: 

1947 pattern: Pattern to match against 

1948 

1949 Returns: 

1950 True if current branch matches pattern 

1951 """ 

1952 try: 

1953 # Get the current branch using refs 

1954 ref_chain, _ = self.refs.follow(HEADREF) 

1955 head_ref = ref_chain[-1] # Get the final resolved ref 

1956 except KeyError: 

1957 pass 

1958 else: 

1959 if head_ref and head_ref.startswith(b"refs/heads/"): 

1960 # Extract branch name from ref 

1961 branch = extract_branch_name(head_ref).decode( 

1962 "utf-8", errors="replace" 

1963 ) 

1964 return match_glob_pattern(branch, pattern) 

1965 return False 

1966 

1967 matchers: dict[str, ConditionMatcher] = { 

1968 "onbranch:": match_onbranch, 

1969 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1970 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1971 } 

1972 

1973 return matchers 

1974 

1975 def get_worktree_config(self) -> "ConfigFile": 

1976 """Get the worktree-specific config. 

1977 

1978 Returns: 

1979 ConfigFile object for the worktree config 

1980 """ 

1981 from .config import ConfigFile 

1982 

1983 path = os.path.join(self.commondir(), "config.worktree") 

1984 try: 

1985 # Pass condition matchers for includeIf evaluation 

1986 condition_matchers = self._get_config_condition_matchers() 

1987 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1988 except FileNotFoundError: 

1989 cf = ConfigFile() 

1990 cf.path = path 

1991 return cf 

1992 

1993 def get_config(self) -> "ConfigFile": 

1994 """Retrieve the config object. 

1995 

1996 Returns: `ConfigFile` object for the ``.git/config`` file. 

1997 """ 

1998 from .config import ConfigFile 

1999 

2000 path = os.path.join(self._commondir, "config") 

2001 try: 

2002 # Pass condition matchers for includeIf evaluation 

2003 condition_matchers = self._get_config_condition_matchers() 

2004 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

2005 except FileNotFoundError: 

2006 ret = ConfigFile() 

2007 ret.path = path 

2008 return ret 

2009 

2010 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2011 """Get the appropriate rebase state manager for this repository. 

2012 

2013 Returns: DiskRebaseStateManager instance 

2014 """ 

2015 import os 

2016 

2017 from .rebase import DiskRebaseStateManager 

2018 

2019 path = os.path.join(self.controldir(), "rebase-merge") 

2020 return DiskRebaseStateManager(path) 

2021 

2022 def get_description(self) -> bytes | None: 

2023 """Retrieve the description of this repository. 

2024 

2025 Returns: Description as bytes or None. 

2026 """ 

2027 path = os.path.join(self._controldir, "description") 

2028 try: 

2029 with GitFile(path, "rb") as f: 

2030 return f.read() 

2031 except FileNotFoundError: 

2032 return None 

2033 

2034 def __repr__(self) -> str: 

2035 """Return string representation of this repository.""" 

2036 return f"<Repo at {self.path!r}>" 

2037 

2038 def set_description(self, description: bytes) -> None: 

2039 """Set the description for this repository. 

2040 

2041 Args: 

2042 description: Text to set as description for this repository. 

2043 """ 

2044 self._put_named_file("description", description) 

2045 

2046 @classmethod 

2047 def _init_maybe_bare( 

2048 cls, 

2049 path: str | bytes | os.PathLike[str], 

2050 controldir: str | bytes | os.PathLike[str], 

2051 bare: bool, 

2052 object_store: PackBasedObjectStore | None = None, 

2053 config: "StackedConfig | None" = None, 

2054 default_branch: bytes | None = None, 

2055 symlinks: bool | None = None, 

2056 format: int | None = None, 

2057 shared_repository: str | bool | None = None, 

2058 ) -> "Repo": 

2059 path = os.fspath(path) 

2060 if isinstance(path, bytes): 

2061 path = os.fsdecode(path) 

2062 controldir = os.fspath(controldir) 

2063 if isinstance(controldir, bytes): 

2064 controldir = os.fsdecode(controldir) 

2065 

2066 # Determine shared repository permissions early 

2067 file_mode: int | None = None 

2068 dir_mode: int | None = None 

2069 if shared_repository is not None: 

2070 file_mode, dir_mode = parse_shared_repository(shared_repository) 

2071 

2072 # Create base directories with appropriate permissions 

2073 for d in BASE_DIRECTORIES: 

2074 dir_path = os.path.join(controldir, *d) 

2075 os.mkdir(dir_path) 

2076 if dir_mode is not None: 

2077 os.chmod(dir_path, dir_mode) 

2078 

2079 if object_store is None: 

2080 object_store = DiskObjectStore.init( 

2081 os.path.join(controldir, OBJECTDIR), 

2082 file_mode=file_mode, 

2083 dir_mode=dir_mode, 

2084 ) 

2085 ret = cls(path, bare=bare, object_store=object_store) 

2086 if default_branch is None: 

2087 if config is None: 

2088 from .config import StackedConfig 

2089 

2090 config = StackedConfig.default() 

2091 try: 

2092 default_branch = config.get("init", "defaultBranch") 

2093 except KeyError: 

2094 default_branch = DEFAULT_BRANCH 

2095 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch)) 

2096 ret._init_files( 

2097 bare=bare, 

2098 symlinks=symlinks, 

2099 format=format, 

2100 shared_repository=shared_repository, 

2101 ) 

2102 return ret 

2103 

2104 @classmethod 

2105 def init( 

2106 cls, 

2107 path: str | bytes | os.PathLike[str], 

2108 *, 

2109 mkdir: bool = False, 

2110 config: "StackedConfig | None" = None, 

2111 default_branch: bytes | None = None, 

2112 symlinks: bool | None = None, 

2113 format: int | None = None, 

2114 shared_repository: str | bool | None = None, 

2115 ) -> "Repo": 

2116 """Create a new repository. 

2117 

2118 Args: 

2119 path: Path in which to create the repository 

2120 mkdir: Whether to create the directory 

2121 config: Configuration object 

2122 default_branch: Default branch name 

2123 symlinks: Whether to support symlinks 

2124 format: Repository format version (defaults to 0) 

2125 shared_repository: Shared repository setting (group, all, umask, or octal) 

2126 Returns: `Repo` instance 

2127 """ 

2128 path = os.fspath(path) 

2129 if isinstance(path, bytes): 

2130 path = os.fsdecode(path) 

2131 if mkdir: 

2132 os.mkdir(path) 

2133 controldir = os.path.join(path, CONTROLDIR) 

2134 os.mkdir(controldir) 

2135 _set_filesystem_hidden(controldir) 

2136 return cls._init_maybe_bare( 

2137 path, 

2138 controldir, 

2139 False, 

2140 config=config, 

2141 default_branch=default_branch, 

2142 symlinks=symlinks, 

2143 format=format, 

2144 shared_repository=shared_repository, 

2145 ) 

2146 

2147 @classmethod 

2148 def _init_new_working_directory( 

2149 cls, 

2150 path: str | bytes | os.PathLike[str], 

2151 main_repo: "Repo", 

2152 identifier: str | None = None, 

2153 mkdir: bool = False, 

2154 ) -> "Repo": 

2155 """Create a new working directory linked to a repository. 

2156 

2157 Args: 

2158 path: Path in which to create the working tree. 

2159 main_repo: Main repository to reference 

2160 identifier: Worktree identifier 

2161 mkdir: Whether to create the directory 

2162 Returns: `Repo` instance 

2163 """ 

2164 path = os.fspath(path) 

2165 if isinstance(path, bytes): 

2166 path = os.fsdecode(path) 

2167 if mkdir: 

2168 os.mkdir(path) 

2169 if identifier is None: 

2170 identifier = os.path.basename(path) 

2171 # Ensure we use absolute path for the worktree control directory 

2172 main_controldir = os.path.abspath(main_repo.controldir()) 

2173 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

2174 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

2175 gitdirfile = os.path.join(path, CONTROLDIR) 

2176 with open(gitdirfile, "wb") as f: 

2177 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

2178 

2179 # Get shared repository permissions from main repository 

2180 _, dir_mode = main_repo._get_shared_repository_permissions() 

2181 

2182 # Create directories with appropriate permissions 

2183 try: 

2184 os.mkdir(main_worktreesdir) 

2185 if dir_mode is not None: 

2186 os.chmod(main_worktreesdir, dir_mode) 

2187 except FileExistsError: 

2188 pass 

2189 try: 

2190 os.mkdir(worktree_controldir) 

2191 if dir_mode is not None: 

2192 os.chmod(worktree_controldir, dir_mode) 

2193 except FileExistsError: 

2194 pass 

2195 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

2196 f.write(os.fsencode(gitdirfile) + b"\n") 

2197 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

2198 f.write(b"../..\n") 

2199 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

2200 f.write(main_repo.head() + b"\n") 

2201 r = cls(os.path.normpath(path)) 

2202 r.get_worktree().reset_index() 

2203 return r 

2204 

2205 @classmethod 

2206 def init_bare( 

2207 cls, 

2208 path: str | bytes | os.PathLike[str], 

2209 *, 

2210 mkdir: bool = False, 

2211 object_store: PackBasedObjectStore | None = None, 

2212 config: "StackedConfig | None" = None, 

2213 default_branch: bytes | None = None, 

2214 format: int | None = None, 

2215 shared_repository: str | bool | None = None, 

2216 ) -> "Repo": 

2217 """Create a new bare repository. 

2218 

2219 ``path`` should already exist and be an empty directory. 

2220 

2221 Args: 

2222 path: Path to create bare repository in 

2223 mkdir: Whether to create the directory 

2224 object_store: Object store to use 

2225 config: Configuration object 

2226 default_branch: Default branch name 

2227 format: Repository format version (defaults to 0) 

2228 shared_repository: Shared repository setting (group, all, umask, or octal) 

2229 Returns: a `Repo` instance 

2230 """ 

2231 path = os.fspath(path) 

2232 if isinstance(path, bytes): 

2233 path = os.fsdecode(path) 

2234 if mkdir: 

2235 os.mkdir(path) 

2236 return cls._init_maybe_bare( 

2237 path, 

2238 path, 

2239 True, 

2240 object_store=object_store, 

2241 config=config, 

2242 default_branch=default_branch, 

2243 format=format, 

2244 shared_repository=shared_repository, 

2245 ) 

2246 

2247 create = init_bare 

2248 

2249 def close(self) -> None: 

2250 """Close any files opened by this repository.""" 

2251 self.object_store.close() 

2252 # Clean up filter context if it was created 

2253 if self.filter_context is not None: 

2254 self.filter_context.close() 

2255 self.filter_context = None 

2256 

2257 def __enter__(self) -> "Repo": 

2258 """Enter context manager.""" 

2259 return self 

2260 

2261 def __exit__( 

2262 self, 

2263 exc_type: type[BaseException] | None, 

2264 exc_val: BaseException | None, 

2265 exc_tb: TracebackType | None, 

2266 ) -> None: 

2267 """Exit context manager and close repository.""" 

2268 self.close() 

2269 

2270 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2271 """Read .gitattributes file from working tree. 

2272 

2273 Returns: 

2274 Dictionary mapping file patterns to attributes 

2275 """ 

2276 gitattributes = {} 

2277 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2278 

2279 if os.path.exists(gitattributes_path): 

2280 with open(gitattributes_path, "rb") as f: 

2281 for line in f: 

2282 line = line.strip() 

2283 if not line or line.startswith(b"#"): 

2284 continue 

2285 

2286 parts = line.split() 

2287 if len(parts) < 2: 

2288 continue 

2289 

2290 pattern = parts[0] 

2291 attrs = {} 

2292 

2293 for attr in parts[1:]: 

2294 if attr.startswith(b"-"): 

2295 # Unset attribute 

2296 attrs[attr[1:]] = b"false" 

2297 elif b"=" in attr: 

2298 # Set to value 

2299 key, value = attr.split(b"=", 1) 

2300 attrs[key] = value 

2301 else: 

2302 # Set attribute 

2303 attrs[attr] = b"true" 

2304 

2305 gitattributes[pattern] = attrs 

2306 

2307 return gitattributes 

2308 

2309 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2310 """Return a BlobNormalizer object.""" 

2311 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2312 

2313 # Get fresh configuration and GitAttributes 

2314 config_stack = self.get_config_stack() 

2315 git_attributes = self.get_gitattributes() 

2316 

2317 # Lazily create FilterContext if needed 

2318 if self.filter_context is None: 

2319 filter_registry = FilterRegistry(config_stack, self) 

2320 self.filter_context = FilterContext(filter_registry) 

2321 else: 

2322 # Refresh the context with current config to handle config changes 

2323 self.filter_context.refresh_config(config_stack) 

2324 

2325 # Return a new FilterBlobNormalizer with the context 

2326 return FilterBlobNormalizer( 

2327 config_stack, git_attributes, filter_context=self.filter_context 

2328 ) 

2329 

2330 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2331 """Read gitattributes for the repository. 

2332 

2333 Args: 

2334 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2335 

2336 Returns: 

2337 GitAttributes object that can be used to match paths 

2338 """ 

2339 from .attrs import ( 

2340 GitAttributes, 

2341 Pattern, 

2342 parse_git_attributes, 

2343 ) 

2344 

2345 patterns = [] 

2346 

2347 # Read system gitattributes (TODO: implement this) 

2348 # Read global gitattributes (TODO: implement this) 

2349 

2350 # Read repository .gitattributes from index/tree 

2351 if tree is None: 

2352 try: 

2353 # Try to get from HEAD 

2354 head = self[b"HEAD"] 

2355 if isinstance(head, Tag): 

2356 _cls, obj = head.object 

2357 head = self.get_object(obj) 

2358 assert isinstance(head, Commit) 

2359 tree = head.tree 

2360 except KeyError: 

2361 # No HEAD, no attributes from tree 

2362 pass 

2363 

2364 if tree is not None: 

2365 try: 

2366 tree_obj = self[tree] 

2367 assert isinstance(tree_obj, Tree) 

2368 if b".gitattributes" in tree_obj: 

2369 _, attrs_sha = tree_obj[b".gitattributes"] 

2370 attrs_blob = self[attrs_sha] 

2371 if isinstance(attrs_blob, Blob): 

2372 attrs_data = BytesIO(attrs_blob.data) 

2373 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2374 pattern = Pattern(pattern_bytes) 

2375 patterns.append((pattern, attrs)) 

2376 except (KeyError, NotTreeError): 

2377 pass 

2378 

2379 # Read .git/info/attributes 

2380 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2381 if os.path.exists(info_attrs_path): 

2382 with open(info_attrs_path, "rb") as f: 

2383 for pattern_bytes, attrs in parse_git_attributes(f): 

2384 pattern = Pattern(pattern_bytes) 

2385 patterns.append((pattern, attrs)) 

2386 

2387 # Read .gitattributes from working directory (if it exists) 

2388 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2389 if os.path.exists(working_attrs_path): 

2390 with open(working_attrs_path, "rb") as f: 

2391 for pattern_bytes, attrs in parse_git_attributes(f): 

2392 pattern = Pattern(pattern_bytes) 

2393 patterns.append((pattern, attrs)) 

2394 

2395 return GitAttributes(patterns) 

2396 

2397 @replace_me(remove_in="0.26.0") 

2398 def _sparse_checkout_file_path(self) -> str: 

2399 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2400 return self.get_worktree()._sparse_checkout_file_path() 

2401 

2402 @replace_me(remove_in="0.26.0") 

2403 def configure_for_cone_mode(self) -> None: 

2404 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2405 return self.get_worktree().configure_for_cone_mode() 

2406 

2407 @replace_me(remove_in="0.26.0") 

2408 def infer_cone_mode(self) -> bool: 

2409 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2410 return self.get_worktree().infer_cone_mode() 

2411 

2412 @replace_me(remove_in="0.26.0") 

2413 def get_sparse_checkout_patterns(self) -> list[str]: 

2414 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2415 

2416 Returns: 

2417 A list of patterns. Returns an empty list if the file is missing. 

2418 """ 

2419 return self.get_worktree().get_sparse_checkout_patterns() 

2420 

2421 @replace_me(remove_in="0.26.0") 

2422 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

2423 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2424 

2425 Creates the info/ directory if it does not exist. 

2426 

2427 Args: 

2428 patterns: A list of gitignore-style patterns to store. 

2429 """ 

2430 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2431 

2432 @replace_me(remove_in="0.26.0") 

2433 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

2434 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2435 

2436 For each directory to include, add an inclusion line that "undoes" the prior 

2437 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2438 Never add the same line twice. 

2439 """ 

2440 return self.get_worktree().set_cone_mode_patterns(dirs) 

2441 

2442 

2443class MemoryRepo(BaseRepo): 

2444 """Repo that stores refs, objects, and named files in memory. 

2445 

2446 MemoryRepos are always bare: they have no working tree and no index, since 

2447 those have a stronger dependency on the filesystem. 

2448 """ 

2449 

2450 filter_context: "FilterContext | None" 

2451 

2452 def __init__(self) -> None: 

2453 """Create a new repository in memory.""" 

2454 from .config import ConfigFile 

2455 

2456 self._reflog: list[Any] = [] 

2457 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2458 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2459 self._named_files: dict[str, bytes] = {} 

2460 self.bare = True 

2461 self._config = ConfigFile() 

2462 self._description: bytes | None = None 

2463 self.filter_context = None 

2464 

2465 def _append_reflog( 

2466 self, 

2467 ref: bytes, 

2468 old_sha: bytes | None, 

2469 new_sha: bytes | None, 

2470 committer: bytes | None, 

2471 timestamp: int | None, 

2472 timezone: int | None, 

2473 message: bytes | None, 

2474 ) -> None: 

2475 self._reflog.append( 

2476 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2477 ) 

2478 

2479 def set_description(self, description: bytes) -> None: 

2480 """Set the description for this repository. 

2481 

2482 Args: 

2483 description: Text to set as description 

2484 """ 

2485 self._description = description 

2486 

2487 def get_description(self) -> bytes | None: 

2488 """Get the description of this repository. 

2489 

2490 Returns: 

2491 Repository description as bytes 

2492 """ 

2493 return self._description 

2494 

2495 def _determine_file_mode(self) -> bool: 

2496 """Probe the file-system to determine whether permissions can be trusted. 

2497 

2498 Returns: True if permissions can be trusted, False otherwise. 

2499 """ 

2500 return sys.platform != "win32" 

2501 

2502 def _determine_symlinks(self) -> bool: 

2503 """Probe the file-system to determine whether permissions can be trusted. 

2504 

2505 Returns: True if permissions can be trusted, False otherwise. 

2506 """ 

2507 return sys.platform != "win32" 

2508 

2509 def _put_named_file(self, path: str, contents: bytes) -> None: 

2510 """Write a file to the control dir with the given name and contents. 

2511 

2512 Args: 

2513 path: The path to the file, relative to the control dir. 

2514 contents: A string to write to the file. 

2515 """ 

2516 self._named_files[path] = contents 

2517 

2518 def _del_named_file(self, path: str) -> None: 

2519 try: 

2520 del self._named_files[path] 

2521 except KeyError: 

2522 pass 

2523 

2524 def get_named_file( 

2525 self, 

2526 path: str | bytes, 

2527 basedir: str | None = None, 

2528 ) -> BytesIO | None: 

2529 """Get a file from the control dir with a specific name. 

2530 

2531 Although the filename should be interpreted as a filename relative to 

2532 the control dir in a disk-baked Repo, the object returned need not be 

2533 pointing to a file in that location. 

2534 

2535 Args: 

2536 path: The path to the file, relative to the control dir. 

2537 basedir: Optional base directory for the path 

2538 Returns: An open file object, or None if the file does not exist. 

2539 """ 

2540 path_str = path.decode() if isinstance(path, bytes) else path 

2541 contents = self._named_files.get(path_str, None) 

2542 if contents is None: 

2543 return None 

2544 return BytesIO(contents) 

2545 

2546 def open_index(self) -> "Index": 

2547 """Fail to open index for this repo, since it is bare. 

2548 

2549 Raises: 

2550 NoIndexPresent: Raised when no index is present 

2551 """ 

2552 raise NoIndexPresent 

2553 

2554 def get_config(self) -> "ConfigFile": 

2555 """Retrieve the config object. 

2556 

2557 Returns: `ConfigFile` object. 

2558 """ 

2559 return self._config 

2560 

2561 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2562 """Get the appropriate rebase state manager for this repository. 

2563 

2564 Returns: MemoryRebaseStateManager instance 

2565 """ 

2566 from .rebase import MemoryRebaseStateManager 

2567 

2568 return MemoryRebaseStateManager(self) 

2569 

2570 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2571 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2572 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2573 

2574 # Get fresh configuration and GitAttributes 

2575 config_stack = self.get_config_stack() 

2576 git_attributes = self.get_gitattributes() 

2577 

2578 # Lazily create FilterContext if needed 

2579 if self.filter_context is None: 

2580 filter_registry = FilterRegistry(config_stack, self) 

2581 self.filter_context = FilterContext(filter_registry) 

2582 else: 

2583 # Refresh the context with current config to handle config changes 

2584 self.filter_context.refresh_config(config_stack) 

2585 

2586 # Return a new FilterBlobNormalizer with the context 

2587 return FilterBlobNormalizer( 

2588 config_stack, git_attributes, filter_context=self.filter_context 

2589 ) 

2590 

2591 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2592 """Read gitattributes for the repository.""" 

2593 from .attrs import GitAttributes 

2594 

2595 # Memory repos don't have working trees or gitattributes files 

2596 # Return empty GitAttributes 

2597 return GitAttributes([]) 

2598 

2599 def close(self) -> None: 

2600 """Close any resources opened by this repository.""" 

2601 # Clean up filter context if it was created 

2602 if self.filter_context is not None: 

2603 self.filter_context.close() 

2604 self.filter_context = None 

2605 

2606 def do_commit( 

2607 self, 

2608 message: bytes | None = None, 

2609 committer: bytes | None = None, 

2610 author: bytes | None = None, 

2611 commit_timestamp: float | None = None, 

2612 commit_timezone: int | None = None, 

2613 author_timestamp: float | None = None, 

2614 author_timezone: int | None = None, 

2615 tree: ObjectID | None = None, 

2616 encoding: bytes | None = None, 

2617 ref: Ref | None = HEADREF, 

2618 merge_heads: list[ObjectID] | None = None, 

2619 no_verify: bool = False, 

2620 sign: bool = False, 

2621 ) -> bytes: 

2622 """Create a new commit. 

2623 

2624 This is a simplified implementation for in-memory repositories that 

2625 doesn't support worktree operations or hooks. 

2626 

2627 Args: 

2628 message: Commit message 

2629 committer: Committer fullname 

2630 author: Author fullname 

2631 commit_timestamp: Commit timestamp (defaults to now) 

2632 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2633 author_timestamp: Author timestamp (defaults to commit timestamp) 

2634 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2635 tree: SHA1 of the tree root to use 

2636 encoding: Encoding 

2637 ref: Optional ref to commit to (defaults to current branch). 

2638 If None, creates a dangling commit without updating any ref. 

2639 merge_heads: Merge heads 

2640 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2641 sign: GPG Sign the commit (ignored for MemoryRepo) 

2642 

2643 Returns: 

2644 New commit SHA1 

2645 """ 

2646 import time 

2647 

2648 from .objects import Commit 

2649 

2650 if tree is None: 

2651 raise ValueError("tree must be specified for MemoryRepo") 

2652 

2653 c = Commit() 

2654 if len(tree) != 40: 

2655 raise ValueError("tree must be a 40-byte hex sha string") 

2656 c.tree = tree 

2657 

2658 config = self.get_config_stack() 

2659 if merge_heads is None: 

2660 merge_heads = [] 

2661 if committer is None: 

2662 committer = get_user_identity(config, kind="COMMITTER") 

2663 check_user_identity(committer) 

2664 c.committer = committer 

2665 if commit_timestamp is None: 

2666 commit_timestamp = time.time() 

2667 c.commit_time = int(commit_timestamp) 

2668 if commit_timezone is None: 

2669 commit_timezone = 0 

2670 c.commit_timezone = commit_timezone 

2671 if author is None: 

2672 author = get_user_identity(config, kind="AUTHOR") 

2673 c.author = author 

2674 check_user_identity(author) 

2675 if author_timestamp is None: 

2676 author_timestamp = commit_timestamp 

2677 c.author_time = int(author_timestamp) 

2678 if author_timezone is None: 

2679 author_timezone = commit_timezone 

2680 c.author_timezone = author_timezone 

2681 if encoding is None: 

2682 try: 

2683 encoding = config.get(("i18n",), "commitEncoding") 

2684 except KeyError: 

2685 pass 

2686 if encoding is not None: 

2687 c.encoding = encoding 

2688 

2689 # Handle message (for MemoryRepo, we don't support callable messages) 

2690 if callable(message): 

2691 message = message(self, c) 

2692 if message is None: 

2693 raise ValueError("Message callback returned None") 

2694 

2695 if message is None: 

2696 raise ValueError("No commit message specified") 

2697 

2698 c.message = message 

2699 

2700 if ref is None: 

2701 # Create a dangling commit 

2702 c.parents = merge_heads 

2703 self.object_store.add_object(c) 

2704 else: 

2705 try: 

2706 old_head = self.refs[ref] 

2707 c.parents = [old_head, *merge_heads] 

2708 self.object_store.add_object(c) 

2709 ok = self.refs.set_if_equals( 

2710 ref, 

2711 old_head, 

2712 c.id, 

2713 message=b"commit: " + message, 

2714 committer=committer, 

2715 timestamp=int(commit_timestamp), 

2716 timezone=commit_timezone, 

2717 ) 

2718 except KeyError: 

2719 c.parents = merge_heads 

2720 self.object_store.add_object(c) 

2721 ok = self.refs.add_if_new( 

2722 ref, 

2723 c.id, 

2724 message=b"commit: " + message, 

2725 committer=committer, 

2726 timestamp=int(commit_timestamp), 

2727 timezone=commit_timezone, 

2728 ) 

2729 if not ok: 

2730 from .errors import CommitError 

2731 

2732 raise CommitError(f"{ref!r} changed during commit") 

2733 

2734 return c.id 

2735 

2736 @classmethod 

2737 def init_bare( 

2738 cls, 

2739 objects: Iterable[ShaFile], 

2740 refs: Mapping[Ref, ObjectID], 

2741 format: int | None = None, 

2742 ) -> "MemoryRepo": 

2743 """Create a new bare repository in memory. 

2744 

2745 Args: 

2746 objects: Objects for the new repository, 

2747 as iterable 

2748 refs: Refs as dictionary, mapping names 

2749 to object SHA1s 

2750 format: Repository format version (defaults to 0) 

2751 """ 

2752 ret = cls() 

2753 for obj in objects: 

2754 ret.object_store.add_object(obj) 

2755 for refname, sha in refs.items(): 

2756 ret.refs.add_if_new(refname, sha) 

2757 ret._init_files(bare=True, format=format) 

2758 return ret