Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

999 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence 

38from io import BytesIO 

39from types import TracebackType 

40from typing import ( 

41 TYPE_CHECKING, 

42 Any, 

43 BinaryIO, 

44 Callable, 

45 Optional, 

46 TypeVar, 

47 Union, 

48) 

49 

50if TYPE_CHECKING: 

51 # There are no circular imports here, but we try to defer imports as long 

52 # as possible to reduce start-up time for anything that doesn't need 

53 # these imports. 

54 from .attrs import GitAttributes 

55 from .config import ConditionMatcher, ConfigFile, StackedConfig 

56 from .diff_tree import RenameDetector 

57 from .filters import FilterBlobNormalizer, FilterContext 

58 from .index import Index 

59 from .notes import Notes 

60 from .object_store import BaseObjectStore, GraphWalker 

61 from .pack import UnpackedObject 

62 from .rebase import RebaseStateManager 

63 from .walk import Walker 

64 from .worktree import WorkTree 

65 

66from . import reflog, replace_me 

67from .errors import ( 

68 NoIndexPresent, 

69 NotBlobError, 

70 NotCommitError, 

71 NotGitRepository, 

72 NotTagError, 

73 NotTreeError, 

74 RefFormatError, 

75) 

76from .file import GitFile 

77from .hooks import ( 

78 CommitMsgShellHook, 

79 Hook, 

80 PostCommitShellHook, 

81 PostReceiveShellHook, 

82 PreCommitShellHook, 

83) 

84from .object_store import ( 

85 DiskObjectStore, 

86 MemoryObjectStore, 

87 MissingObjectFinder, 

88 ObjectStoreGraphWalker, 

89 PackBasedObjectStore, 

90 PackCapableObjectStore, 

91 find_shallow, 

92 peel_sha, 

93) 

94from .objects import ( 

95 Blob, 

96 Commit, 

97 ObjectID, 

98 ShaFile, 

99 Tag, 

100 Tree, 

101 check_hexsha, 

102 valid_hexsha, 

103) 

104from .pack import generate_unpacked_objects 

105from .refs import ( 

106 ANNOTATED_TAG_SUFFIX, # noqa: F401 

107 LOCAL_BRANCH_PREFIX, 

108 LOCAL_TAG_PREFIX, # noqa: F401 

109 SYMREF, # noqa: F401 

110 DictRefsContainer, 

111 DiskRefsContainer, 

112 InfoRefsContainer, # noqa: F401 

113 Ref, 

114 RefsContainer, 

115 _set_default_branch, 

116 _set_head, 

117 _set_origin_head, 

118 check_ref_format, # noqa: F401 

119 is_per_worktree_ref, 

120 read_packed_refs, # noqa: F401 

121 read_packed_refs_with_peeled, # noqa: F401 

122 serialize_refs, 

123 write_packed_refs, # noqa: F401 

124) 

125 

126CONTROLDIR = ".git" 

127OBJECTDIR = "objects" 

128DEFAULT_OFS_DELTA = True 

129 

130T = TypeVar("T", bound="ShaFile") 

131REFSDIR = "refs" 

132REFSDIR_TAGS = "tags" 

133REFSDIR_HEADS = "heads" 

134INDEX_FILENAME = "index" 

135COMMONDIR = "commondir" 

136GITDIR = "gitdir" 

137WORKTREES = "worktrees" 

138 

139BASE_DIRECTORIES = [ 

140 ["branches"], 

141 [REFSDIR], 

142 [REFSDIR, REFSDIR_TAGS], 

143 [REFSDIR, REFSDIR_HEADS], 

144 ["hooks"], 

145 ["info"], 

146] 

147 

148DEFAULT_BRANCH = b"master" 

149 

150 

151class InvalidUserIdentity(Exception): 

152 """User identity is not of the format 'user <email>'.""" 

153 

154 def __init__(self, identity: str) -> None: 

155 """Initialize InvalidUserIdentity exception.""" 

156 self.identity = identity 

157 

158 

159class DefaultIdentityNotFound(Exception): 

160 """Default identity could not be determined.""" 

161 

162 

163# TODO(jelmer): Cache? 

164def _get_default_identity() -> tuple[str, str]: 

165 import socket 

166 

167 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

168 username = os.environ.get(name) 

169 if username: 

170 break 

171 else: 

172 username = None 

173 

174 try: 

175 import pwd 

176 except ImportError: 

177 fullname = None 

178 else: 

179 try: 

180 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

181 except KeyError: 

182 fullname = None 

183 else: 

184 if getattr(entry, "gecos", None): 

185 fullname = entry.pw_gecos.split(",")[0] 

186 else: 

187 fullname = None 

188 if username is None: 

189 username = entry.pw_name 

190 if not fullname: 

191 if username is None: 

192 raise DefaultIdentityNotFound("no username found") 

193 fullname = username 

194 email = os.environ.get("EMAIL") 

195 if email is None: 

196 if username is None: 

197 raise DefaultIdentityNotFound("no username found") 

198 email = f"{username}@{socket.gethostname()}" 

199 return (fullname, email) 

200 

201 

202def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

203 """Determine the identity to use for new commits. 

204 

205 If kind is set, this first checks 

206 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

207 

208 If those variables are not set, then it will fall back 

209 to reading the user.name and user.email settings from 

210 the specified configuration. 

211 

212 If that also fails, then it will fall back to using 

213 the current users' identity as obtained from the host 

214 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

215 

216 Args: 

217 config: Configuration stack to read from 

218 kind: Optional kind to return identity for, 

219 usually either "AUTHOR" or "COMMITTER". 

220 

221 Returns: 

222 A user identity 

223 """ 

224 user: Optional[bytes] = None 

225 email: Optional[bytes] = None 

226 if kind: 

227 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

228 if user_uc is not None: 

229 user = user_uc.encode("utf-8") 

230 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

231 if email_uc is not None: 

232 email = email_uc.encode("utf-8") 

233 if user is None: 

234 try: 

235 user = config.get(("user",), "name") 

236 except KeyError: 

237 user = None 

238 if email is None: 

239 try: 

240 email = config.get(("user",), "email") 

241 except KeyError: 

242 email = None 

243 default_user, default_email = _get_default_identity() 

244 if user is None: 

245 user = default_user.encode("utf-8") 

246 if email is None: 

247 email = default_email.encode("utf-8") 

248 if email.startswith(b"<") and email.endswith(b">"): 

249 email = email[1:-1] 

250 return user + b" <" + email + b">" 

251 

252 

253def check_user_identity(identity: bytes) -> None: 

254 """Verify that a user identity is formatted correctly. 

255 

256 Args: 

257 identity: User identity bytestring 

258 Raises: 

259 InvalidUserIdentity: Raised when identity is invalid 

260 """ 

261 try: 

262 _fst, snd = identity.split(b" <", 1) 

263 except ValueError as exc: 

264 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

265 if b">" not in snd: 

266 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

267 if b"\0" in identity or b"\n" in identity: 

268 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

269 

270 

271def parse_graftpoints( 

272 graftpoints: Iterable[bytes], 

273) -> dict[bytes, list[bytes]]: 

274 """Convert a list of graftpoints into a dict. 

275 

276 Args: 

277 graftpoints: Iterator of graftpoint lines 

278 

279 Each line is formatted as: 

280 <commit sha1> <parent sha1> [<parent sha1>]* 

281 

282 Resulting dictionary is: 

283 <commit sha1>: [<parent sha1>*] 

284 

285 https://git.wiki.kernel.org/index.php/GraftPoint 

286 """ 

287 grafts = {} 

288 for line in graftpoints: 

289 raw_graft = line.split(None, 1) 

290 

291 commit = raw_graft[0] 

292 if len(raw_graft) == 2: 

293 parents = raw_graft[1].split() 

294 else: 

295 parents = [] 

296 

297 for sha in [commit, *parents]: 

298 check_hexsha(sha, "Invalid graftpoint") 

299 

300 grafts[commit] = parents 

301 return grafts 

302 

303 

304def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes: 

305 """Convert a dictionary of grafts into string. 

306 

307 The graft dictionary is: 

308 <commit sha1>: [<parent sha1>*] 

309 

310 Each line is formatted as: 

311 <commit sha1> <parent sha1> [<parent sha1>]* 

312 

313 https://git.wiki.kernel.org/index.php/GraftPoint 

314 

315 """ 

316 graft_lines = [] 

317 for commit, parents in graftpoints.items(): 

318 if parents: 

319 graft_lines.append(commit + b" " + b" ".join(parents)) 

320 else: 

321 graft_lines.append(commit) 

322 return b"\n".join(graft_lines) 

323 

324 

325def _set_filesystem_hidden(path: str) -> None: 

326 """Mark path as to be hidden if supported by platform and filesystem. 

327 

328 On win32 uses SetFileAttributesW api: 

329 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

330 """ 

331 if sys.platform == "win32": 

332 import ctypes 

333 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

334 

335 FILE_ATTRIBUTE_HIDDEN = 2 

336 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

337 ("SetFileAttributesW", ctypes.windll.kernel32) 

338 ) 

339 

340 if isinstance(path, bytes): 

341 path = os.fsdecode(path) 

342 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

343 pass # Could raise or log `ctypes.WinError()` here 

344 

345 # Could implement other platform specific filesystem hiding here 

346 

347 

348class ParentsProvider: 

349 """Provider for commit parent information.""" 

350 

351 def __init__( 

352 self, 

353 store: "BaseObjectStore", 

354 grafts: dict[bytes, list[bytes]] = {}, 

355 shallows: Iterable[bytes] = [], 

356 ) -> None: 

357 """Initialize ParentsProvider. 

358 

359 Args: 

360 store: Object store to use 

361 grafts: Graft information 

362 shallows: Shallow commit SHAs 

363 """ 

364 self.store = store 

365 self.grafts = grafts 

366 self.shallows = set(shallows) 

367 

368 # Get commit graph once at initialization for performance 

369 self.commit_graph = store.get_commit_graph() 

370 

371 def get_parents( 

372 self, commit_id: bytes, commit: Optional[Commit] = None 

373 ) -> list[bytes]: 

374 """Get parents for a commit using the parents provider.""" 

375 try: 

376 return self.grafts[commit_id] 

377 except KeyError: 

378 pass 

379 if commit_id in self.shallows: 

380 return [] 

381 

382 # Try to use commit graph for faster parent lookup 

383 if self.commit_graph: 

384 parents = self.commit_graph.get_parents(commit_id) 

385 if parents is not None: 

386 return parents 

387 

388 # Fallback to reading the commit object 

389 if commit is None: 

390 obj = self.store[commit_id] 

391 assert isinstance(obj, Commit) 

392 commit = obj 

393 parents = commit.parents 

394 assert isinstance(parents, list) 

395 return parents 

396 

397 

398class BaseRepo: 

399 """Base class for a git repository. 

400 

401 This base class is meant to be used for Repository implementations that e.g. 

402 work on top of a different transport than a standard filesystem path. 

403 

404 Attributes: 

405 object_store: Dictionary-like object for accessing 

406 the objects 

407 refs: Dictionary-like object with the refs in this 

408 repository 

409 """ 

410 

411 def __init__( 

412 self, object_store: "PackCapableObjectStore", refs: RefsContainer 

413 ) -> None: 

414 """Open a repository. 

415 

416 This shouldn't be called directly, but rather through one of the 

417 base classes, such as MemoryRepo or Repo. 

418 

419 Args: 

420 object_store: Object store to use 

421 refs: Refs container to use 

422 """ 

423 self.object_store = object_store 

424 self.refs = refs 

425 

426 self._graftpoints: dict[bytes, list[bytes]] = {} 

427 self.hooks: dict[str, Hook] = {} 

428 

429 def _determine_file_mode(self) -> bool: 

430 """Probe the file-system to determine whether permissions can be trusted. 

431 

432 Returns: True if permissions can be trusted, False otherwise. 

433 """ 

434 raise NotImplementedError(self._determine_file_mode) 

435 

436 def _determine_symlinks(self) -> bool: 

437 """Probe the filesystem to determine whether symlinks can be created. 

438 

439 Returns: True if symlinks can be created, False otherwise. 

440 """ 

441 # For now, just mimic the old behaviour 

442 return sys.platform != "win32" 

443 

444 def _init_files( 

445 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

446 ) -> None: 

447 """Initialize a default set of named files.""" 

448 from .config import ConfigFile 

449 

450 self._put_named_file("description", b"Unnamed repository") 

451 f = BytesIO() 

452 cf = ConfigFile() 

453 if format is None: 

454 format = 0 

455 if format not in (0, 1): 

456 raise ValueError(f"Unsupported repository format version: {format}") 

457 cf.set("core", "repositoryformatversion", str(format)) 

458 if self._determine_file_mode(): 

459 cf.set("core", "filemode", True) 

460 else: 

461 cf.set("core", "filemode", False) 

462 

463 if symlinks is None and not bare: 

464 symlinks = self._determine_symlinks() 

465 

466 if symlinks is False: 

467 cf.set("core", "symlinks", symlinks) 

468 

469 cf.set("core", "bare", bare) 

470 cf.set("core", "logallrefupdates", True) 

471 cf.write_to_file(f) 

472 self._put_named_file("config", f.getvalue()) 

473 self._put_named_file(os.path.join("info", "exclude"), b"") 

474 

475 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

476 """Get a file from the control dir with a specific name. 

477 

478 Although the filename should be interpreted as a filename relative to 

479 the control dir in a disk-based Repo, the object returned need not be 

480 pointing to a file in that location. 

481 

482 Args: 

483 path: The path to the file, relative to the control dir. 

484 Returns: An open file object, or None if the file does not exist. 

485 """ 

486 raise NotImplementedError(self.get_named_file) 

487 

488 def _put_named_file(self, path: str, contents: bytes) -> None: 

489 """Write a file to the control dir with the given name and contents. 

490 

491 Args: 

492 path: The path to the file, relative to the control dir. 

493 contents: A string to write to the file. 

494 """ 

495 raise NotImplementedError(self._put_named_file) 

496 

497 def _del_named_file(self, path: str) -> None: 

498 """Delete a file in the control directory with the given name.""" 

499 raise NotImplementedError(self._del_named_file) 

500 

501 def open_index(self) -> "Index": 

502 """Open the index for this repository. 

503 

504 Raises: 

505 NoIndexPresent: If no index is present 

506 Returns: The matching `Index` 

507 """ 

508 raise NotImplementedError(self.open_index) 

509 

510 def fetch( 

511 self, 

512 target: "BaseRepo", 

513 determine_wants: Optional[ 

514 Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]] 

515 ] = None, 

516 progress: Optional[Callable[..., None]] = None, 

517 depth: Optional[int] = None, 

518 ) -> dict[bytes, bytes]: 

519 """Fetch objects into another repository. 

520 

521 Args: 

522 target: The target repository 

523 determine_wants: Optional function to determine what refs to 

524 fetch. 

525 progress: Optional progress function 

526 depth: Optional shallow fetch depth 

527 Returns: The local refs 

528 """ 

529 if determine_wants is None: 

530 determine_wants = target.object_store.determine_wants_all 

531 count, pack_data = self.fetch_pack_data( 

532 determine_wants, 

533 target.get_graph_walker(), 

534 progress=progress, 

535 depth=depth, 

536 ) 

537 target.object_store.add_pack_data(count, pack_data, progress) 

538 return self.get_refs() 

539 

540 def fetch_pack_data( 

541 self, 

542 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]], 

543 graph_walker: "GraphWalker", 

544 progress: Optional[Callable[[bytes], None]], 

545 *, 

546 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None, 

547 depth: Optional[int] = None, 

548 ) -> tuple[int, Iterator["UnpackedObject"]]: 

549 """Fetch the pack data required for a set of revisions. 

550 

551 Args: 

552 determine_wants: Function that takes a dictionary with heads 

553 and returns the list of heads to fetch. 

554 graph_walker: Object that can iterate over the list of revisions 

555 to fetch and has an "ack" method that will be called to acknowledge 

556 that a revision is present. 

557 progress: Simple progress function that will be called with 

558 updated progress strings. 

559 get_tagged: Function that returns a dict of pointed-to sha -> 

560 tag sha for including tags. 

561 depth: Shallow fetch depth 

562 Returns: count and iterator over pack data 

563 """ 

564 missing_objects = self.find_missing_objects( 

565 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

566 ) 

567 if missing_objects is None: 

568 return 0, iter([]) 

569 remote_has = missing_objects.get_remote_has() 

570 object_ids = list(missing_objects) 

571 return len(object_ids), generate_unpacked_objects( 

572 self.object_store, object_ids, progress=progress, other_haves=remote_has 

573 ) 

574 

575 def find_missing_objects( 

576 self, 

577 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]], 

578 graph_walker: "GraphWalker", 

579 progress: Optional[Callable[[bytes], None]], 

580 *, 

581 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None, 

582 depth: Optional[int] = None, 

583 ) -> Optional[MissingObjectFinder]: 

584 """Fetch the missing objects required for a set of revisions. 

585 

586 Args: 

587 determine_wants: Function that takes a dictionary with heads 

588 and returns the list of heads to fetch. 

589 graph_walker: Object that can iterate over the list of revisions 

590 to fetch and has an "ack" method that will be called to acknowledge 

591 that a revision is present. 

592 progress: Simple progress function that will be called with 

593 updated progress strings. 

594 get_tagged: Function that returns a dict of pointed-to sha -> 

595 tag sha for including tags. 

596 depth: Shallow fetch depth 

597 Returns: iterator over objects, with __len__ implemented 

598 """ 

599 refs = serialize_refs(self.object_store, self.get_refs()) 

600 

601 wants = determine_wants(refs, depth) 

602 if not isinstance(wants, list): 

603 raise TypeError("determine_wants() did not return a list") 

604 

605 current_shallow = set(getattr(graph_walker, "shallow", set())) 

606 

607 if depth not in (None, 0): 

608 assert depth is not None 

609 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

610 # Only update if graph_walker has shallow attribute 

611 if hasattr(graph_walker, "shallow"): 

612 graph_walker.shallow.update(shallow - not_shallow) 

613 new_shallow = graph_walker.shallow - current_shallow 

614 unshallow = not_shallow & current_shallow 

615 setattr(graph_walker, "unshallow", unshallow) 

616 if hasattr(graph_walker, "update_shallow"): 

617 graph_walker.update_shallow(new_shallow, unshallow) 

618 else: 

619 unshallow = getattr(graph_walker, "unshallow", set()) 

620 

621 if wants == []: 

622 # TODO(dborowitz): find a way to short-circuit that doesn't change 

623 # this interface. 

624 

625 if getattr(graph_walker, "shallow", set()) or unshallow: 

626 # Do not send a pack in shallow short-circuit path 

627 return None 

628 

629 # Return an actual MissingObjectFinder with empty wants 

630 return MissingObjectFinder( 

631 self.object_store, 

632 haves=[], 

633 wants=[], 

634 ) 

635 

636 # If the graph walker is set up with an implementation that can 

637 # ACK/NAK to the wire, it will write data to the client through 

638 # this call as a side-effect. 

639 haves = self.object_store.find_common_revisions(graph_walker) 

640 

641 # Deal with shallow requests separately because the haves do 

642 # not reflect what objects are missing 

643 if getattr(graph_walker, "shallow", set()) or unshallow: 

644 # TODO: filter the haves commits from iter_shas. the specific 

645 # commits aren't missing. 

646 haves = [] 

647 

648 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

649 

650 def get_parents(commit: Commit) -> list[bytes]: 

651 """Get parents for a commit using the parents provider. 

652 

653 Args: 

654 commit: Commit object 

655 

656 Returns: 

657 List of parent commit SHAs 

658 """ 

659 return parents_provider.get_parents(commit.id, commit) 

660 

661 return MissingObjectFinder( 

662 self.object_store, 

663 haves=haves, 

664 wants=wants, 

665 shallow=getattr(graph_walker, "shallow", set()), 

666 progress=progress, 

667 get_tagged=get_tagged, 

668 get_parents=get_parents, 

669 ) 

670 

671 def generate_pack_data( 

672 self, 

673 have: set[ObjectID], 

674 want: set[ObjectID], 

675 progress: Optional[Callable[[str], None]] = None, 

676 ofs_delta: Optional[bool] = None, 

677 ) -> tuple[int, Iterator["UnpackedObject"]]: 

678 """Generate pack data objects for a set of wants/haves. 

679 

680 Args: 

681 have: List of SHA1s of objects that should not be sent 

682 want: List of SHA1s of objects that should be sent 

683 ofs_delta: Whether OFS deltas can be included 

684 progress: Optional progress reporting method 

685 """ 

686 return self.object_store.generate_pack_data( 

687 have, 

688 want, 

689 shallow=self.get_shallow(), 

690 progress=progress, 

691 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

692 ) 

693 

694 def get_graph_walker( 

695 self, heads: Optional[list[ObjectID]] = None 

696 ) -> ObjectStoreGraphWalker: 

697 """Retrieve a graph walker. 

698 

699 A graph walker is used by a remote repository (or proxy) 

700 to find out which objects are present in this repository. 

701 

702 Args: 

703 heads: Repository heads to use (optional) 

704 Returns: A graph walker object 

705 """ 

706 if heads is None: 

707 heads = [ 

708 sha 

709 for sha in self.refs.as_dict(b"refs/heads").values() 

710 if sha in self.object_store 

711 ] 

712 parents_provider = ParentsProvider(self.object_store) 

713 return ObjectStoreGraphWalker( 

714 heads, 

715 parents_provider.get_parents, 

716 shallow=self.get_shallow(), 

717 update_shallow=self.update_shallow, 

718 ) 

719 

720 def get_refs(self) -> dict[bytes, bytes]: 

721 """Get dictionary with all refs. 

722 

723 Returns: A ``dict`` mapping ref names to SHA1s 

724 """ 

725 return self.refs.as_dict() 

726 

727 def head(self) -> bytes: 

728 """Return the SHA1 pointed at by HEAD.""" 

729 # TODO: move this method to WorkTree 

730 return self.refs[b"HEAD"] 

731 

732 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

733 assert len(sha) in (20, 40) 

734 ret = self.get_object(sha) 

735 if not isinstance(ret, cls): 

736 if cls is Commit: 

737 raise NotCommitError(ret.id) 

738 elif cls is Blob: 

739 raise NotBlobError(ret.id) 

740 elif cls is Tree: 

741 raise NotTreeError(ret.id) 

742 elif cls is Tag: 

743 raise NotTagError(ret.id) 

744 else: 

745 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

746 return ret 

747 

748 def get_object(self, sha: bytes) -> ShaFile: 

749 """Retrieve the object with the specified SHA. 

750 

751 Args: 

752 sha: SHA to retrieve 

753 Returns: A ShaFile object 

754 Raises: 

755 KeyError: when the object can not be found 

756 """ 

757 return self.object_store[sha] 

758 

759 def parents_provider(self) -> ParentsProvider: 

760 """Get a parents provider for this repository. 

761 

762 Returns: 

763 ParentsProvider instance configured with grafts and shallows 

764 """ 

765 return ParentsProvider( 

766 self.object_store, 

767 grafts=self._graftpoints, 

768 shallows=self.get_shallow(), 

769 ) 

770 

771 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

772 """Retrieve the parents of a specific commit. 

773 

774 If the specific commit is a graftpoint, the graft parents 

775 will be returned instead. 

776 

777 Args: 

778 sha: SHA of the commit for which to retrieve the parents 

779 commit: Optional commit matching the sha 

780 Returns: List of parents 

781 """ 

782 return self.parents_provider().get_parents(sha, commit) 

783 

784 def get_config(self) -> "ConfigFile": 

785 """Retrieve the config object. 

786 

787 Returns: `ConfigFile` object for the ``.git/config`` file. 

788 """ 

789 raise NotImplementedError(self.get_config) 

790 

791 def get_worktree_config(self) -> "ConfigFile": 

792 """Retrieve the worktree config object.""" 

793 raise NotImplementedError(self.get_worktree_config) 

794 

795 def get_description(self) -> Optional[bytes]: 

796 """Retrieve the description for this repository. 

797 

798 Returns: Bytes with the description of the repository 

799 as set by the user. 

800 """ 

801 raise NotImplementedError(self.get_description) 

802 

803 def set_description(self, description: bytes) -> None: 

804 """Set the description for this repository. 

805 

806 Args: 

807 description: Text to set as description for this repository. 

808 """ 

809 raise NotImplementedError(self.set_description) 

810 

811 def get_rebase_state_manager(self) -> "RebaseStateManager": 

812 """Get the appropriate rebase state manager for this repository. 

813 

814 Returns: RebaseStateManager instance 

815 """ 

816 raise NotImplementedError(self.get_rebase_state_manager) 

817 

818 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

819 """Return a BlobNormalizer object for checkin/checkout operations. 

820 

821 Returns: BlobNormalizer instance 

822 """ 

823 raise NotImplementedError(self.get_blob_normalizer) 

824 

825 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

826 """Read gitattributes for the repository. 

827 

828 Args: 

829 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

830 

831 Returns: 

832 GitAttributes object that can be used to match paths 

833 """ 

834 raise NotImplementedError(self.get_gitattributes) 

835 

836 def get_config_stack(self) -> "StackedConfig": 

837 """Return a config stack for this repository. 

838 

839 This stack accesses the configuration for both this repository 

840 itself (.git/config) and the global configuration, which usually 

841 lives in ~/.gitconfig. 

842 

843 Returns: `Config` instance for this repository 

844 """ 

845 from .config import ConfigFile, StackedConfig 

846 

847 local_config = self.get_config() 

848 backends: list[ConfigFile] = [local_config] 

849 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

850 backends.append(self.get_worktree_config()) 

851 

852 backends += StackedConfig.default_backends() 

853 return StackedConfig(backends, writable=local_config) 

854 

855 def get_shallow(self) -> set[ObjectID]: 

856 """Get the set of shallow commits. 

857 

858 Returns: Set of shallow commits. 

859 """ 

860 f = self.get_named_file("shallow") 

861 if f is None: 

862 return set() 

863 with f: 

864 return {line.strip() for line in f} 

865 

866 def update_shallow( 

867 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]] 

868 ) -> None: 

869 """Update the list of shallow objects. 

870 

871 Args: 

872 new_shallow: Newly shallow objects 

873 new_unshallow: Newly no longer shallow objects 

874 """ 

875 shallow = self.get_shallow() 

876 if new_shallow: 

877 shallow.update(new_shallow) 

878 if new_unshallow: 

879 shallow.difference_update(new_unshallow) 

880 if shallow: 

881 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

882 else: 

883 self._del_named_file("shallow") 

884 

885 def get_peeled(self, ref: Ref) -> ObjectID: 

886 """Get the peeled value of a ref. 

887 

888 Args: 

889 ref: The refname to peel. 

890 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

891 intermediate tags; if the original ref does not point to a tag, 

892 this will equal the original SHA1. 

893 """ 

894 cached = self.refs.get_peeled(ref) 

895 if cached is not None: 

896 return cached 

897 return peel_sha(self.object_store, self.refs[ref])[1].id 

898 

899 @property 

900 def notes(self) -> "Notes": 

901 """Access notes functionality for this repository. 

902 

903 Returns: 

904 Notes object for accessing notes 

905 """ 

906 from .notes import Notes 

907 

908 return Notes(self.object_store, self.refs) 

909 

910 def get_walker( 

911 self, 

912 include: Optional[Sequence[bytes]] = None, 

913 exclude: Optional[Sequence[bytes]] = None, 

914 order: str = "date", 

915 reverse: bool = False, 

916 max_entries: Optional[int] = None, 

917 paths: Optional[Sequence[bytes]] = None, 

918 rename_detector: Optional["RenameDetector"] = None, 

919 follow: bool = False, 

920 since: Optional[int] = None, 

921 until: Optional[int] = None, 

922 queue_cls: Optional[type] = None, 

923 ) -> "Walker": 

924 """Obtain a walker for this repository. 

925 

926 Args: 

927 include: Iterable of SHAs of commits to include along with their 

928 ancestors. Defaults to [HEAD] 

929 exclude: Iterable of SHAs of commits to exclude along with their 

930 ancestors, overriding includes. 

931 order: ORDER_* constant specifying the order of results. 

932 Anything other than ORDER_DATE may result in O(n) memory usage. 

933 reverse: If True, reverse the order of output, requiring O(n) 

934 memory. 

935 max_entries: The maximum number of entries to yield, or None for 

936 no limit. 

937 paths: Iterable of file or subtree paths to show entries for. 

938 rename_detector: diff.RenameDetector object for detecting 

939 renames. 

940 follow: If True, follow path across renames/copies. Forces a 

941 default rename_detector. 

942 since: Timestamp to list commits after. 

943 until: Timestamp to list commits before. 

944 queue_cls: A class to use for a queue of commits, supporting the 

945 iterator protocol. The constructor takes a single argument, the Walker. 

946 **kwargs: Additional keyword arguments 

947 

948 Returns: A `Walker` object 

949 """ 

950 from .walk import Walker, _CommitTimeQueue 

951 

952 if include is None: 

953 include = [self.head()] 

954 

955 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

956 return Walker( 

957 self.object_store, 

958 include, 

959 exclude=exclude, 

960 order=order, 

961 reverse=reverse, 

962 max_entries=max_entries, 

963 paths=paths, 

964 rename_detector=rename_detector, 

965 follow=follow, 

966 since=since, 

967 until=until, 

968 get_parents=lambda commit: self.get_parents(commit.id, commit), 

969 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

970 ) 

971 

972 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile": 

973 """Retrieve a Git object by SHA1 or ref. 

974 

975 Args: 

976 name: A Git object SHA1 or a ref name 

977 Returns: A `ShaFile` object, such as a Commit or Blob 

978 Raises: 

979 KeyError: when the specified ref or object does not exist 

980 """ 

981 if not isinstance(name, bytes): 

982 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

983 if len(name) in (20, 40): 

984 try: 

985 return self.object_store[name] 

986 except (KeyError, ValueError): 

987 pass 

988 try: 

989 return self.object_store[self.refs[name]] 

990 except RefFormatError as exc: 

991 raise KeyError(name) from exc 

992 

993 def __contains__(self, name: bytes) -> bool: 

994 """Check if a specific Git object or ref is present. 

995 

996 Args: 

997 name: Git object SHA1 or ref name 

998 """ 

999 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

1000 return name in self.object_store or name in self.refs 

1001 else: 

1002 return name in self.refs 

1003 

1004 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

1005 """Set a ref. 

1006 

1007 Args: 

1008 name: ref name 

1009 value: Ref value - either a ShaFile object, or a hex sha 

1010 """ 

1011 if name.startswith(b"refs/") or name == b"HEAD": 

1012 if isinstance(value, ShaFile): 

1013 self.refs[name] = value.id 

1014 elif isinstance(value, bytes): 

1015 self.refs[name] = value 

1016 else: 

1017 raise TypeError(value) 

1018 else: 

1019 raise ValueError(name) 

1020 

1021 def __delitem__(self, name: bytes) -> None: 

1022 """Remove a ref. 

1023 

1024 Args: 

1025 name: Name of the ref to remove 

1026 """ 

1027 if name.startswith(b"refs/") or name == b"HEAD": 

1028 del self.refs[name] 

1029 else: 

1030 raise ValueError(name) 

1031 

1032 def _get_user_identity( 

1033 self, config: "StackedConfig", kind: Optional[str] = None 

1034 ) -> bytes: 

1035 """Determine the identity to use for new commits.""" 

1036 warnings.warn( 

1037 "use get_user_identity() rather than Repo._get_user_identity", 

1038 DeprecationWarning, 

1039 ) 

1040 return get_user_identity(config) 

1041 

1042 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1043 """Add or modify graftpoints. 

1044 

1045 Args: 

1046 updated_graftpoints: Dict of commit shas to list of parent shas 

1047 """ 

1048 # Simple validation 

1049 for commit, parents in updated_graftpoints.items(): 

1050 for sha in [commit, *parents]: 

1051 check_hexsha(sha, "Invalid graftpoint") 

1052 

1053 self._graftpoints.update(updated_graftpoints) 

1054 

1055 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None: 

1056 """Remove graftpoints. 

1057 

1058 Args: 

1059 to_remove: List of commit shas 

1060 """ 

1061 for sha in to_remove: 

1062 del self._graftpoints[sha] 

1063 

1064 def _read_heads(self, name: str) -> list[bytes]: 

1065 f = self.get_named_file(name) 

1066 if f is None: 

1067 return [] 

1068 with f: 

1069 return [line.strip() for line in f.readlines() if line.strip()] 

1070 

1071 def get_worktree(self) -> "WorkTree": 

1072 """Get the working tree for this repository. 

1073 

1074 Returns: 

1075 WorkTree instance for performing working tree operations 

1076 

1077 Raises: 

1078 NotImplementedError: If the repository doesn't support working trees 

1079 """ 

1080 raise NotImplementedError( 

1081 "Working tree operations not supported by this repository type" 

1082 ) 

1083 

1084 @replace_me(remove_in="0.26.0") 

1085 def do_commit( 

1086 self, 

1087 message: Optional[bytes] = None, 

1088 committer: Optional[bytes] = None, 

1089 author: Optional[bytes] = None, 

1090 commit_timestamp: Optional[float] = None, 

1091 commit_timezone: Optional[int] = None, 

1092 author_timestamp: Optional[float] = None, 

1093 author_timezone: Optional[int] = None, 

1094 tree: Optional[ObjectID] = None, 

1095 encoding: Optional[bytes] = None, 

1096 ref: Optional[Ref] = b"HEAD", 

1097 merge_heads: Optional[list[ObjectID]] = None, 

1098 no_verify: bool = False, 

1099 sign: bool = False, 

1100 ) -> bytes: 

1101 """Create a new commit. 

1102 

1103 If not specified, committer and author default to 

1104 get_user_identity(..., 'COMMITTER') 

1105 and get_user_identity(..., 'AUTHOR') respectively. 

1106 

1107 Args: 

1108 message: Commit message (bytes or callable that takes (repo, commit) 

1109 and returns bytes) 

1110 committer: Committer fullname 

1111 author: Author fullname 

1112 commit_timestamp: Commit timestamp (defaults to now) 

1113 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1114 author_timestamp: Author timestamp (defaults to commit 

1115 timestamp) 

1116 author_timezone: Author timestamp timezone 

1117 (defaults to commit timestamp timezone) 

1118 tree: SHA1 of the tree root to use (if not specified the 

1119 current index will be committed). 

1120 encoding: Encoding 

1121 ref: Optional ref to commit to (defaults to current branch). 

1122 If None, creates a dangling commit without updating any ref. 

1123 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1124 no_verify: Skip pre-commit and commit-msg hooks 

1125 sign: GPG Sign the commit (bool, defaults to False, 

1126 pass True to use default GPG key, 

1127 pass a str containing Key ID to use a specific GPG key) 

1128 

1129 Returns: 

1130 New commit SHA1 

1131 """ 

1132 return self.get_worktree().commit( 

1133 message=message, 

1134 committer=committer, 

1135 author=author, 

1136 commit_timestamp=commit_timestamp, 

1137 commit_timezone=commit_timezone, 

1138 author_timestamp=author_timestamp, 

1139 author_timezone=author_timezone, 

1140 tree=tree, 

1141 encoding=encoding, 

1142 ref=ref, 

1143 merge_heads=merge_heads, 

1144 no_verify=no_verify, 

1145 sign=sign, 

1146 ) 

1147 

1148 

1149def read_gitfile(f: BinaryIO) -> str: 

1150 """Read a ``.git`` file. 

1151 

1152 The first line of the file should start with "gitdir: " 

1153 

1154 Args: 

1155 f: File-like object to read from 

1156 Returns: A path 

1157 """ 

1158 cs = f.read() 

1159 if not cs.startswith(b"gitdir: "): 

1160 raise ValueError("Expected file to start with 'gitdir: '") 

1161 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8") 

1162 

1163 

1164class UnsupportedVersion(Exception): 

1165 """Unsupported repository version.""" 

1166 

1167 def __init__(self, version: int) -> None: 

1168 """Initialize UnsupportedVersion exception. 

1169 

1170 Args: 

1171 version: The unsupported repository version 

1172 """ 

1173 self.version = version 

1174 

1175 

1176class UnsupportedExtension(Exception): 

1177 """Unsupported repository extension.""" 

1178 

1179 def __init__(self, extension: str) -> None: 

1180 """Initialize UnsupportedExtension exception. 

1181 

1182 Args: 

1183 extension: The unsupported repository extension 

1184 """ 

1185 self.extension = extension 

1186 

1187 

1188class Repo(BaseRepo): 

1189 """A git repository backed by local disk. 

1190 

1191 To open an existing repository, call the constructor with 

1192 the path of the repository. 

1193 

1194 To create a new repository, use the Repo.init class method. 

1195 

1196 Note that a repository object may hold on to resources such 

1197 as file handles for performance reasons; call .close() to free 

1198 up those resources. 

1199 

1200 Attributes: 

1201 path: Path to the working copy (if it exists) or repository control 

1202 directory (if the repository is bare) 

1203 bare: Whether this is a bare repository 

1204 """ 

1205 

1206 path: str 

1207 bare: bool 

1208 object_store: DiskObjectStore 

1209 filter_context: Optional["FilterContext"] 

1210 

1211 def __init__( 

1212 self, 

1213 root: Union[str, bytes, os.PathLike[str]], 

1214 object_store: Optional[PackBasedObjectStore] = None, 

1215 bare: Optional[bool] = None, 

1216 ) -> None: 

1217 """Open a repository on disk. 

1218 

1219 Args: 

1220 root: Path to the repository's root. 

1221 object_store: ObjectStore to use; if omitted, we use the 

1222 repository's default object store 

1223 bare: True if this is a bare repository. 

1224 """ 

1225 root = os.fspath(root) 

1226 if isinstance(root, bytes): 

1227 root = os.fsdecode(root) 

1228 hidden_path = os.path.join(root, CONTROLDIR) 

1229 if bare is None: 

1230 if os.path.isfile(hidden_path) or os.path.isdir( 

1231 os.path.join(hidden_path, OBJECTDIR) 

1232 ): 

1233 bare = False 

1234 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1235 os.path.join(root, REFSDIR) 

1236 ): 

1237 bare = True 

1238 else: 

1239 raise NotGitRepository( 

1240 "No git repository was found at {path}".format(**dict(path=root)) 

1241 ) 

1242 

1243 self.bare = bare 

1244 if bare is False: 

1245 if os.path.isfile(hidden_path): 

1246 with open(hidden_path, "rb") as f: 

1247 path = read_gitfile(f) 

1248 self._controldir = os.path.join(root, path) 

1249 else: 

1250 self._controldir = hidden_path 

1251 else: 

1252 self._controldir = root 

1253 commondir = self.get_named_file(COMMONDIR) 

1254 if commondir is not None: 

1255 with commondir: 

1256 self._commondir = os.path.join( 

1257 self.controldir(), 

1258 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1259 ) 

1260 else: 

1261 self._commondir = self._controldir 

1262 self.path = root 

1263 

1264 # Initialize refs early so they're available for config condition matchers 

1265 self.refs = DiskRefsContainer( 

1266 self.commondir(), self._controldir, logger=self._write_reflog 

1267 ) 

1268 

1269 # Initialize worktrees container 

1270 from .worktree import WorkTreeContainer 

1271 

1272 self.worktrees = WorkTreeContainer(self) 

1273 

1274 config = self.get_config() 

1275 try: 

1276 repository_format_version = config.get("core", "repositoryformatversion") 

1277 format_version = ( 

1278 0 

1279 if repository_format_version is None 

1280 else int(repository_format_version) 

1281 ) 

1282 except KeyError: 

1283 format_version = 0 

1284 

1285 if format_version not in (0, 1): 

1286 raise UnsupportedVersion(format_version) 

1287 

1288 # Track extensions we encounter 

1289 has_reftable_extension = False 

1290 for extension, value in config.items((b"extensions",)): 

1291 if extension.lower() == b"refstorage": 

1292 if value == b"reftable": 

1293 has_reftable_extension = True 

1294 else: 

1295 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1296 elif extension.lower() not in (b"worktreeconfig",): 

1297 raise UnsupportedExtension(extension.decode("utf-8")) 

1298 

1299 if object_store is None: 

1300 object_store = DiskObjectStore.from_config( 

1301 os.path.join(self.commondir(), OBJECTDIR), config 

1302 ) 

1303 

1304 # Use reftable if extension is configured 

1305 if has_reftable_extension: 

1306 from .reftable import ReftableRefsContainer 

1307 

1308 self.refs = ReftableRefsContainer(self.commondir()) 

1309 # Update worktrees container after refs change 

1310 self.worktrees = WorkTreeContainer(self) 

1311 BaseRepo.__init__(self, object_store, self.refs) 

1312 

1313 self._graftpoints = {} 

1314 graft_file = self.get_named_file( 

1315 os.path.join("info", "grafts"), basedir=self.commondir() 

1316 ) 

1317 if graft_file: 

1318 with graft_file: 

1319 self._graftpoints.update(parse_graftpoints(graft_file)) 

1320 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1321 if graft_file: 

1322 with graft_file: 

1323 self._graftpoints.update(parse_graftpoints(graft_file)) 

1324 

1325 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1326 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1327 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1328 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1329 

1330 # Initialize filter context as None, will be created lazily 

1331 self.filter_context = None 

1332 

1333 def get_worktree(self) -> "WorkTree": 

1334 """Get the working tree for this repository. 

1335 

1336 Returns: 

1337 WorkTree instance for performing working tree operations 

1338 """ 

1339 from .worktree import WorkTree 

1340 

1341 return WorkTree(self, self.path) 

1342 

1343 def _write_reflog( 

1344 self, 

1345 ref: bytes, 

1346 old_sha: bytes, 

1347 new_sha: bytes, 

1348 committer: Optional[bytes], 

1349 timestamp: Optional[int], 

1350 timezone: Optional[int], 

1351 message: bytes, 

1352 ) -> None: 

1353 from .reflog import format_reflog_line 

1354 

1355 path = self._reflog_path(ref) 

1356 try: 

1357 os.makedirs(os.path.dirname(path)) 

1358 except FileExistsError: 

1359 pass 

1360 if committer is None: 

1361 config = self.get_config_stack() 

1362 committer = get_user_identity(config) 

1363 check_user_identity(committer) 

1364 if timestamp is None: 

1365 timestamp = int(time.time()) 

1366 if timezone is None: 

1367 timezone = 0 # FIXME 

1368 with open(path, "ab") as f: 

1369 f.write( 

1370 format_reflog_line( 

1371 old_sha, new_sha, committer, timestamp, timezone, message 

1372 ) 

1373 + b"\n" 

1374 ) 

1375 

1376 def _reflog_path(self, ref: bytes) -> str: 

1377 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1378 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1379 

1380 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1381 return os.path.join(base, "logs", os.fsdecode(ref)) 

1382 

1383 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1384 """Read reflog entries for a reference. 

1385 

1386 Args: 

1387 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1388 

1389 Yields: 

1390 reflog.Entry objects in chronological order (oldest first) 

1391 """ 

1392 from .reflog import read_reflog 

1393 

1394 path = self._reflog_path(ref) 

1395 try: 

1396 with open(path, "rb") as f: 

1397 yield from read_reflog(f) 

1398 except FileNotFoundError: 

1399 return 

1400 

1401 @classmethod 

1402 def discover(cls, start: Union[str, bytes, os.PathLike[str]] = ".") -> "Repo": 

1403 """Iterate parent directories to discover a repository. 

1404 

1405 Return a Repo object for the first parent directory that looks like a 

1406 Git repository. 

1407 

1408 Args: 

1409 start: The directory to start discovery from (defaults to '.') 

1410 """ 

1411 path = os.path.abspath(start) 

1412 while True: 

1413 try: 

1414 return cls(path) 

1415 except NotGitRepository: 

1416 new_path, _tail = os.path.split(path) 

1417 if new_path == path: # Root reached 

1418 break 

1419 path = new_path 

1420 start_str = os.fspath(start) 

1421 if isinstance(start_str, bytes): 

1422 start_str = start_str.decode("utf-8") 

1423 raise NotGitRepository(f"No git repository was found at {start_str}") 

1424 

1425 def controldir(self) -> str: 

1426 """Return the path of the control directory.""" 

1427 return self._controldir 

1428 

1429 def commondir(self) -> str: 

1430 """Return the path of the common directory. 

1431 

1432 For a main working tree, it is identical to controldir(). 

1433 

1434 For a linked working tree, it is the control directory of the 

1435 main working tree. 

1436 """ 

1437 return self._commondir 

1438 

1439 def _determine_file_mode(self) -> bool: 

1440 """Probe the file-system to determine whether permissions can be trusted. 

1441 

1442 Returns: True if permissions can be trusted, False otherwise. 

1443 """ 

1444 fname = os.path.join(self.path, ".probe-permissions") 

1445 with open(fname, "w") as f: 

1446 f.write("") 

1447 

1448 st1 = os.lstat(fname) 

1449 try: 

1450 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1451 except PermissionError: 

1452 return False 

1453 st2 = os.lstat(fname) 

1454 

1455 os.unlink(fname) 

1456 

1457 mode_differs = st1.st_mode != st2.st_mode 

1458 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1459 

1460 return mode_differs and st2_has_exec 

1461 

1462 def _determine_symlinks(self) -> bool: 

1463 """Probe the filesystem to determine whether symlinks can be created. 

1464 

1465 Returns: True if symlinks can be created, False otherwise. 

1466 """ 

1467 # TODO(jelmer): Actually probe disk / look at filesystem 

1468 return sys.platform != "win32" 

1469 

1470 def _put_named_file(self, path: str, contents: bytes) -> None: 

1471 """Write a file to the control dir with the given name and contents. 

1472 

1473 Args: 

1474 path: The path to the file, relative to the control dir. 

1475 contents: A string to write to the file. 

1476 """ 

1477 path = path.lstrip(os.path.sep) 

1478 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1479 f.write(contents) 

1480 

1481 def _del_named_file(self, path: str) -> None: 

1482 try: 

1483 os.unlink(os.path.join(self.controldir(), path)) 

1484 except FileNotFoundError: 

1485 return 

1486 

1487 def get_named_file( 

1488 self, 

1489 path: Union[str, bytes], 

1490 basedir: Optional[str] = None, 

1491 ) -> Optional[BinaryIO]: 

1492 """Get a file from the control dir with a specific name. 

1493 

1494 Although the filename should be interpreted as a filename relative to 

1495 the control dir in a disk-based Repo, the object returned need not be 

1496 pointing to a file in that location. 

1497 

1498 Args: 

1499 path: The path to the file, relative to the control dir. 

1500 basedir: Optional argument that specifies an alternative to the 

1501 control dir. 

1502 Returns: An open file object, or None if the file does not exist. 

1503 """ 

1504 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1505 # the dumb web serving code. 

1506 if basedir is None: 

1507 basedir = self.controldir() 

1508 if isinstance(path, bytes): 

1509 path = path.decode("utf-8") 

1510 path = path.lstrip(os.path.sep) 

1511 try: 

1512 return open(os.path.join(basedir, path), "rb") 

1513 except FileNotFoundError: 

1514 return None 

1515 

1516 def index_path(self) -> str: 

1517 """Return path to the index file.""" 

1518 return os.path.join(self.controldir(), INDEX_FILENAME) 

1519 

1520 def open_index(self) -> "Index": 

1521 """Open the index for this repository. 

1522 

1523 Raises: 

1524 NoIndexPresent: If no index is present 

1525 Returns: The matching `Index` 

1526 """ 

1527 from .index import Index 

1528 

1529 if not self.has_index(): 

1530 raise NoIndexPresent 

1531 

1532 # Check for manyFiles feature configuration 

1533 config = self.get_config_stack() 

1534 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1535 skip_hash = False 

1536 index_version = None 

1537 

1538 if many_files: 

1539 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1540 try: 

1541 index_version_str = config.get(b"index", b"version") 

1542 index_version = int(index_version_str) 

1543 except KeyError: 

1544 index_version = 4 # Default to version 4 for manyFiles 

1545 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1546 else: 

1547 # Check for explicit index settings 

1548 try: 

1549 index_version_str = config.get(b"index", b"version") 

1550 index_version = int(index_version_str) 

1551 except KeyError: 

1552 index_version = None 

1553 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1554 

1555 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1556 

1557 def has_index(self) -> bool: 

1558 """Check if an index is present.""" 

1559 # Bare repos must never have index files; non-bare repos may have a 

1560 # missing index file, which is treated as empty. 

1561 return not self.bare 

1562 

1563 @replace_me(remove_in="0.26.0") 

1564 def stage( 

1565 self, 

1566 fs_paths: Union[ 

1567 str, bytes, os.PathLike[str], Iterable[Union[str, bytes, os.PathLike[str]]] 

1568 ], 

1569 ) -> None: 

1570 """Stage a set of paths. 

1571 

1572 Args: 

1573 fs_paths: List of paths, relative to the repository path 

1574 """ 

1575 return self.get_worktree().stage(fs_paths) 

1576 

1577 @replace_me(remove_in="0.26.0") 

1578 def unstage(self, fs_paths: Sequence[str]) -> None: 

1579 """Unstage specific file in the index. 

1580 

1581 Args: 

1582 fs_paths: a list of files to unstage, 

1583 relative to the repository path. 

1584 """ 

1585 return self.get_worktree().unstage(fs_paths) 

1586 

1587 def clone( 

1588 self, 

1589 target_path: Union[str, bytes, os.PathLike[str]], 

1590 *, 

1591 mkdir: bool = True, 

1592 bare: bool = False, 

1593 origin: bytes = b"origin", 

1594 checkout: Optional[bool] = None, 

1595 branch: Optional[bytes] = None, 

1596 progress: Optional[Callable[[str], None]] = None, 

1597 depth: Optional[int] = None, 

1598 symlinks: Optional[bool] = None, 

1599 ) -> "Repo": 

1600 """Clone this repository. 

1601 

1602 Args: 

1603 target_path: Target path 

1604 mkdir: Create the target directory 

1605 bare: Whether to create a bare repository 

1606 checkout: Whether or not to check-out HEAD after cloning 

1607 origin: Base name for refs in target repository 

1608 cloned from this repository 

1609 branch: Optional branch or tag to be used as HEAD in the new repository 

1610 instead of this repository's HEAD. 

1611 progress: Optional progress function 

1612 depth: Depth at which to fetch 

1613 symlinks: Symlinks setting (default to autodetect) 

1614 Returns: Created repository as `Repo` 

1615 """ 

1616 encoded_path = os.fsencode(self.path) 

1617 

1618 if mkdir: 

1619 os.mkdir(target_path) 

1620 

1621 try: 

1622 if not bare: 

1623 target = Repo.init(target_path, symlinks=symlinks) 

1624 if checkout is None: 

1625 checkout = True 

1626 else: 

1627 if checkout: 

1628 raise ValueError("checkout and bare are incompatible") 

1629 target = Repo.init_bare(target_path) 

1630 

1631 try: 

1632 target_config = target.get_config() 

1633 target_config.set((b"remote", origin), b"url", encoded_path) 

1634 target_config.set( 

1635 (b"remote", origin), 

1636 b"fetch", 

1637 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1638 ) 

1639 target_config.write_to_path() 

1640 

1641 ref_message = b"clone: from " + encoded_path 

1642 self.fetch(target, depth=depth) 

1643 target.refs.import_refs( 

1644 b"refs/remotes/" + origin, 

1645 self.refs.as_dict(b"refs/heads"), 

1646 message=ref_message, 

1647 ) 

1648 target.refs.import_refs( 

1649 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1650 ) 

1651 

1652 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1653 origin_head = head_chain[-1] if head_chain else None 

1654 if origin_sha and not origin_head: 

1655 # set detached HEAD 

1656 target.refs[b"HEAD"] = origin_sha 

1657 else: 

1658 _set_origin_head(target.refs, origin, origin_head) 

1659 head_ref = _set_default_branch( 

1660 target.refs, origin, origin_head, branch, ref_message 

1661 ) 

1662 

1663 # Update target head 

1664 if head_ref: 

1665 head = _set_head(target.refs, head_ref, ref_message) 

1666 else: 

1667 head = None 

1668 

1669 if checkout and head is not None: 

1670 target.get_worktree().reset_index() 

1671 except BaseException: 

1672 target.close() 

1673 raise 

1674 except BaseException: 

1675 if mkdir: 

1676 import shutil 

1677 

1678 shutil.rmtree(target_path) 

1679 raise 

1680 return target 

1681 

1682 @replace_me(remove_in="0.26.0") 

1683 def reset_index(self, tree: Optional[bytes] = None) -> None: 

1684 """Reset the index back to a specific tree. 

1685 

1686 Args: 

1687 tree: Tree SHA to reset to, None for current HEAD tree. 

1688 """ 

1689 return self.get_worktree().reset_index(tree) 

1690 

1691 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1692 """Get condition matchers for includeIf conditions. 

1693 

1694 Returns a dict of condition prefix to matcher function. 

1695 """ 

1696 from pathlib import Path 

1697 

1698 from .config import ConditionMatcher, match_glob_pattern 

1699 

1700 # Add gitdir matchers 

1701 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1702 """Match gitdir against a pattern. 

1703 

1704 Args: 

1705 pattern: Pattern to match against 

1706 case_sensitive: Whether to match case-sensitively 

1707 

1708 Returns: 

1709 True if gitdir matches pattern 

1710 """ 

1711 # Handle relative patterns (starting with ./) 

1712 if pattern.startswith("./"): 

1713 # Can't handle relative patterns without config directory context 

1714 return False 

1715 

1716 # Normalize repository path 

1717 try: 

1718 repo_path = str(Path(self._controldir).resolve()) 

1719 except (OSError, ValueError): 

1720 return False 

1721 

1722 # Expand ~ in pattern and normalize 

1723 pattern = os.path.expanduser(pattern) 

1724 

1725 # Normalize pattern following Git's rules 

1726 pattern = pattern.replace("\\", "/") 

1727 if not pattern.startswith(("~/", "./", "/", "**")): 

1728 # Check for Windows absolute path 

1729 if len(pattern) >= 2 and pattern[1] == ":": 

1730 pass 

1731 else: 

1732 pattern = "**/" + pattern 

1733 if pattern.endswith("/"): 

1734 pattern = pattern + "**" 

1735 

1736 # Use the existing _match_gitdir_pattern function 

1737 from .config import _match_gitdir_pattern 

1738 

1739 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1740 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1741 

1742 return _match_gitdir_pattern( 

1743 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1744 ) 

1745 

1746 # Add onbranch matcher 

1747 def match_onbranch(pattern: str) -> bool: 

1748 """Match current branch against a pattern. 

1749 

1750 Args: 

1751 pattern: Pattern to match against 

1752 

1753 Returns: 

1754 True if current branch matches pattern 

1755 """ 

1756 try: 

1757 # Get the current branch using refs 

1758 ref_chain, _ = self.refs.follow(b"HEAD") 

1759 head_ref = ref_chain[-1] # Get the final resolved ref 

1760 except KeyError: 

1761 pass 

1762 else: 

1763 if head_ref and head_ref.startswith(b"refs/heads/"): 

1764 # Extract branch name from ref 

1765 branch = head_ref[11:].decode("utf-8", errors="replace") 

1766 return match_glob_pattern(branch, pattern) 

1767 return False 

1768 

1769 matchers: dict[str, ConditionMatcher] = { 

1770 "onbranch:": match_onbranch, 

1771 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1772 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1773 } 

1774 

1775 return matchers 

1776 

1777 def get_worktree_config(self) -> "ConfigFile": 

1778 """Get the worktree-specific config. 

1779 

1780 Returns: 

1781 ConfigFile object for the worktree config 

1782 """ 

1783 from .config import ConfigFile 

1784 

1785 path = os.path.join(self.commondir(), "config.worktree") 

1786 try: 

1787 # Pass condition matchers for includeIf evaluation 

1788 condition_matchers = self._get_config_condition_matchers() 

1789 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1790 except FileNotFoundError: 

1791 cf = ConfigFile() 

1792 cf.path = path 

1793 return cf 

1794 

1795 def get_config(self) -> "ConfigFile": 

1796 """Retrieve the config object. 

1797 

1798 Returns: `ConfigFile` object for the ``.git/config`` file. 

1799 """ 

1800 from .config import ConfigFile 

1801 

1802 path = os.path.join(self._commondir, "config") 

1803 try: 

1804 # Pass condition matchers for includeIf evaluation 

1805 condition_matchers = self._get_config_condition_matchers() 

1806 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1807 except FileNotFoundError: 

1808 ret = ConfigFile() 

1809 ret.path = path 

1810 return ret 

1811 

1812 def get_rebase_state_manager(self) -> "RebaseStateManager": 

1813 """Get the appropriate rebase state manager for this repository. 

1814 

1815 Returns: DiskRebaseStateManager instance 

1816 """ 

1817 import os 

1818 

1819 from .rebase import DiskRebaseStateManager 

1820 

1821 path = os.path.join(self.controldir(), "rebase-merge") 

1822 return DiskRebaseStateManager(path) 

1823 

1824 def get_description(self) -> Optional[bytes]: 

1825 """Retrieve the description of this repository. 

1826 

1827 Returns: Description as bytes or None. 

1828 """ 

1829 path = os.path.join(self._controldir, "description") 

1830 try: 

1831 with GitFile(path, "rb") as f: 

1832 return f.read() 

1833 except FileNotFoundError: 

1834 return None 

1835 

1836 def __repr__(self) -> str: 

1837 """Return string representation of this repository.""" 

1838 return f"<Repo at {self.path!r}>" 

1839 

1840 def set_description(self, description: bytes) -> None: 

1841 """Set the description for this repository. 

1842 

1843 Args: 

1844 description: Text to set as description for this repository. 

1845 """ 

1846 self._put_named_file("description", description) 

1847 

1848 @classmethod 

1849 def _init_maybe_bare( 

1850 cls, 

1851 path: Union[str, bytes, os.PathLike[str]], 

1852 controldir: Union[str, bytes, os.PathLike[str]], 

1853 bare: bool, 

1854 object_store: Optional[PackBasedObjectStore] = None, 

1855 config: Optional["StackedConfig"] = None, 

1856 default_branch: Optional[bytes] = None, 

1857 symlinks: Optional[bool] = None, 

1858 format: Optional[int] = None, 

1859 ) -> "Repo": 

1860 path = os.fspath(path) 

1861 if isinstance(path, bytes): 

1862 path = os.fsdecode(path) 

1863 controldir = os.fspath(controldir) 

1864 if isinstance(controldir, bytes): 

1865 controldir = os.fsdecode(controldir) 

1866 for d in BASE_DIRECTORIES: 

1867 os.mkdir(os.path.join(controldir, *d)) 

1868 if object_store is None: 

1869 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1870 ret = cls(path, bare=bare, object_store=object_store) 

1871 if default_branch is None: 

1872 if config is None: 

1873 from .config import StackedConfig 

1874 

1875 config = StackedConfig.default() 

1876 try: 

1877 default_branch = config.get("init", "defaultBranch") 

1878 except KeyError: 

1879 default_branch = DEFAULT_BRANCH 

1880 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1881 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1882 return ret 

1883 

1884 @classmethod 

1885 def init( 

1886 cls, 

1887 path: Union[str, bytes, os.PathLike[str]], 

1888 *, 

1889 mkdir: bool = False, 

1890 config: Optional["StackedConfig"] = None, 

1891 default_branch: Optional[bytes] = None, 

1892 symlinks: Optional[bool] = None, 

1893 format: Optional[int] = None, 

1894 ) -> "Repo": 

1895 """Create a new repository. 

1896 

1897 Args: 

1898 path: Path in which to create the repository 

1899 mkdir: Whether to create the directory 

1900 config: Configuration object 

1901 default_branch: Default branch name 

1902 symlinks: Whether to support symlinks 

1903 format: Repository format version (defaults to 0) 

1904 Returns: `Repo` instance 

1905 """ 

1906 path = os.fspath(path) 

1907 if isinstance(path, bytes): 

1908 path = os.fsdecode(path) 

1909 if mkdir: 

1910 os.mkdir(path) 

1911 controldir = os.path.join(path, CONTROLDIR) 

1912 os.mkdir(controldir) 

1913 _set_filesystem_hidden(controldir) 

1914 return cls._init_maybe_bare( 

1915 path, 

1916 controldir, 

1917 False, 

1918 config=config, 

1919 default_branch=default_branch, 

1920 symlinks=symlinks, 

1921 format=format, 

1922 ) 

1923 

1924 @classmethod 

1925 def _init_new_working_directory( 

1926 cls, 

1927 path: Union[str, bytes, os.PathLike[str]], 

1928 main_repo: "Repo", 

1929 identifier: Optional[str] = None, 

1930 mkdir: bool = False, 

1931 ) -> "Repo": 

1932 """Create a new working directory linked to a repository. 

1933 

1934 Args: 

1935 path: Path in which to create the working tree. 

1936 main_repo: Main repository to reference 

1937 identifier: Worktree identifier 

1938 mkdir: Whether to create the directory 

1939 Returns: `Repo` instance 

1940 """ 

1941 path = os.fspath(path) 

1942 if isinstance(path, bytes): 

1943 path = os.fsdecode(path) 

1944 if mkdir: 

1945 os.mkdir(path) 

1946 if identifier is None: 

1947 identifier = os.path.basename(path) 

1948 # Ensure we use absolute path for the worktree control directory 

1949 main_controldir = os.path.abspath(main_repo.controldir()) 

1950 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1951 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1952 gitdirfile = os.path.join(path, CONTROLDIR) 

1953 with open(gitdirfile, "wb") as f: 

1954 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1955 try: 

1956 os.mkdir(main_worktreesdir) 

1957 except FileExistsError: 

1958 pass 

1959 try: 

1960 os.mkdir(worktree_controldir) 

1961 except FileExistsError: 

1962 pass 

1963 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1964 f.write(os.fsencode(gitdirfile) + b"\n") 

1965 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1966 f.write(b"../..\n") 

1967 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1968 f.write(main_repo.head() + b"\n") 

1969 r = cls(os.path.normpath(path)) 

1970 r.get_worktree().reset_index() 

1971 return r 

1972 

1973 @classmethod 

1974 def init_bare( 

1975 cls, 

1976 path: Union[str, bytes, os.PathLike[str]], 

1977 *, 

1978 mkdir: bool = False, 

1979 object_store: Optional[PackBasedObjectStore] = None, 

1980 config: Optional["StackedConfig"] = None, 

1981 default_branch: Optional[bytes] = None, 

1982 format: Optional[int] = None, 

1983 ) -> "Repo": 

1984 """Create a new bare repository. 

1985 

1986 ``path`` should already exist and be an empty directory. 

1987 

1988 Args: 

1989 path: Path to create bare repository in 

1990 mkdir: Whether to create the directory 

1991 object_store: Object store to use 

1992 config: Configuration object 

1993 default_branch: Default branch name 

1994 format: Repository format version (defaults to 0) 

1995 Returns: a `Repo` instance 

1996 """ 

1997 path = os.fspath(path) 

1998 if isinstance(path, bytes): 

1999 path = os.fsdecode(path) 

2000 if mkdir: 

2001 os.mkdir(path) 

2002 return cls._init_maybe_bare( 

2003 path, 

2004 path, 

2005 True, 

2006 object_store=object_store, 

2007 config=config, 

2008 default_branch=default_branch, 

2009 format=format, 

2010 ) 

2011 

2012 create = init_bare 

2013 

2014 def close(self) -> None: 

2015 """Close any files opened by this repository.""" 

2016 self.object_store.close() 

2017 # Clean up filter context if it was created 

2018 if self.filter_context is not None: 

2019 self.filter_context.close() 

2020 self.filter_context = None 

2021 

2022 def __enter__(self) -> "Repo": 

2023 """Enter context manager.""" 

2024 return self 

2025 

2026 def __exit__( 

2027 self, 

2028 exc_type: Optional[type[BaseException]], 

2029 exc_val: Optional[BaseException], 

2030 exc_tb: Optional[TracebackType], 

2031 ) -> None: 

2032 """Exit context manager and close repository.""" 

2033 self.close() 

2034 

2035 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2036 """Read .gitattributes file from working tree. 

2037 

2038 Returns: 

2039 Dictionary mapping file patterns to attributes 

2040 """ 

2041 gitattributes = {} 

2042 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2043 

2044 if os.path.exists(gitattributes_path): 

2045 with open(gitattributes_path, "rb") as f: 

2046 for line in f: 

2047 line = line.strip() 

2048 if not line or line.startswith(b"#"): 

2049 continue 

2050 

2051 parts = line.split() 

2052 if len(parts) < 2: 

2053 continue 

2054 

2055 pattern = parts[0] 

2056 attrs = {} 

2057 

2058 for attr in parts[1:]: 

2059 if attr.startswith(b"-"): 

2060 # Unset attribute 

2061 attrs[attr[1:]] = b"false" 

2062 elif b"=" in attr: 

2063 # Set to value 

2064 key, value = attr.split(b"=", 1) 

2065 attrs[key] = value 

2066 else: 

2067 # Set attribute 

2068 attrs[attr] = b"true" 

2069 

2070 gitattributes[pattern] = attrs 

2071 

2072 return gitattributes 

2073 

2074 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2075 """Return a BlobNormalizer object.""" 

2076 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2077 

2078 # Get fresh configuration and GitAttributes 

2079 config_stack = self.get_config_stack() 

2080 git_attributes = self.get_gitattributes() 

2081 

2082 # Lazily create FilterContext if needed 

2083 if self.filter_context is None: 

2084 filter_registry = FilterRegistry(config_stack, self) 

2085 self.filter_context = FilterContext(filter_registry) 

2086 else: 

2087 # Refresh the context with current config to handle config changes 

2088 self.filter_context.refresh_config(config_stack) 

2089 

2090 # Return a new FilterBlobNormalizer with the context 

2091 return FilterBlobNormalizer( 

2092 config_stack, git_attributes, filter_context=self.filter_context 

2093 ) 

2094 

2095 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2096 """Read gitattributes for the repository. 

2097 

2098 Args: 

2099 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2100 

2101 Returns: 

2102 GitAttributes object that can be used to match paths 

2103 """ 

2104 from .attrs import ( 

2105 GitAttributes, 

2106 Pattern, 

2107 parse_git_attributes, 

2108 ) 

2109 

2110 patterns = [] 

2111 

2112 # Read system gitattributes (TODO: implement this) 

2113 # Read global gitattributes (TODO: implement this) 

2114 

2115 # Read repository .gitattributes from index/tree 

2116 if tree is None: 

2117 try: 

2118 # Try to get from HEAD 

2119 head = self[b"HEAD"] 

2120 if isinstance(head, Tag): 

2121 _cls, obj = head.object 

2122 head = self.get_object(obj) 

2123 assert isinstance(head, Commit) 

2124 tree = head.tree 

2125 except KeyError: 

2126 # No HEAD, no attributes from tree 

2127 pass 

2128 

2129 if tree is not None: 

2130 try: 

2131 tree_obj = self[tree] 

2132 assert isinstance(tree_obj, Tree) 

2133 if b".gitattributes" in tree_obj: 

2134 _, attrs_sha = tree_obj[b".gitattributes"] 

2135 attrs_blob = self[attrs_sha] 

2136 if isinstance(attrs_blob, Blob): 

2137 attrs_data = BytesIO(attrs_blob.data) 

2138 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2139 pattern = Pattern(pattern_bytes) 

2140 patterns.append((pattern, attrs)) 

2141 except (KeyError, NotTreeError): 

2142 pass 

2143 

2144 # Read .git/info/attributes 

2145 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2146 if os.path.exists(info_attrs_path): 

2147 with open(info_attrs_path, "rb") as f: 

2148 for pattern_bytes, attrs in parse_git_attributes(f): 

2149 pattern = Pattern(pattern_bytes) 

2150 patterns.append((pattern, attrs)) 

2151 

2152 # Read .gitattributes from working directory (if it exists) 

2153 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2154 if os.path.exists(working_attrs_path): 

2155 with open(working_attrs_path, "rb") as f: 

2156 for pattern_bytes, attrs in parse_git_attributes(f): 

2157 pattern = Pattern(pattern_bytes) 

2158 patterns.append((pattern, attrs)) 

2159 

2160 return GitAttributes(patterns) 

2161 

2162 @replace_me(remove_in="0.26.0") 

2163 def _sparse_checkout_file_path(self) -> str: 

2164 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2165 return self.get_worktree()._sparse_checkout_file_path() 

2166 

2167 @replace_me(remove_in="0.26.0") 

2168 def configure_for_cone_mode(self) -> None: 

2169 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2170 return self.get_worktree().configure_for_cone_mode() 

2171 

2172 @replace_me(remove_in="0.26.0") 

2173 def infer_cone_mode(self) -> bool: 

2174 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2175 return self.get_worktree().infer_cone_mode() 

2176 

2177 @replace_me(remove_in="0.26.0") 

2178 def get_sparse_checkout_patterns(self) -> list[str]: 

2179 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2180 

2181 Returns: 

2182 A list of patterns. Returns an empty list if the file is missing. 

2183 """ 

2184 return self.get_worktree().get_sparse_checkout_patterns() 

2185 

2186 @replace_me(remove_in="0.26.0") 

2187 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

2188 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2189 

2190 Creates the info/ directory if it does not exist. 

2191 

2192 Args: 

2193 patterns: A list of gitignore-style patterns to store. 

2194 """ 

2195 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2196 

2197 @replace_me(remove_in="0.26.0") 

2198 def set_cone_mode_patterns(self, dirs: Union[Sequence[str], None] = None) -> None: 

2199 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2200 

2201 For each directory to include, add an inclusion line that "undoes" the prior 

2202 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2203 Never add the same line twice. 

2204 """ 

2205 return self.get_worktree().set_cone_mode_patterns(dirs) 

2206 

2207 

2208class MemoryRepo(BaseRepo): 

2209 """Repo that stores refs, objects, and named files in memory. 

2210 

2211 MemoryRepos are always bare: they have no working tree and no index, since 

2212 those have a stronger dependency on the filesystem. 

2213 """ 

2214 

2215 filter_context: Optional["FilterContext"] 

2216 

2217 def __init__(self) -> None: 

2218 """Create a new repository in memory.""" 

2219 from .config import ConfigFile 

2220 

2221 self._reflog: list[Any] = [] 

2222 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2223 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2224 self._named_files: dict[str, bytes] = {} 

2225 self.bare = True 

2226 self._config = ConfigFile() 

2227 self._description: Optional[bytes] = None 

2228 self.filter_context = None 

2229 

2230 def _append_reflog( 

2231 self, 

2232 ref: bytes, 

2233 old_sha: Optional[bytes], 

2234 new_sha: Optional[bytes], 

2235 committer: Optional[bytes], 

2236 timestamp: Optional[int], 

2237 timezone: Optional[int], 

2238 message: Optional[bytes], 

2239 ) -> None: 

2240 self._reflog.append( 

2241 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2242 ) 

2243 

2244 def set_description(self, description: bytes) -> None: 

2245 """Set the description for this repository. 

2246 

2247 Args: 

2248 description: Text to set as description 

2249 """ 

2250 self._description = description 

2251 

2252 def get_description(self) -> Optional[bytes]: 

2253 """Get the description of this repository. 

2254 

2255 Returns: 

2256 Repository description as bytes 

2257 """ 

2258 return self._description 

2259 

2260 def _determine_file_mode(self) -> bool: 

2261 """Probe the file-system to determine whether permissions can be trusted. 

2262 

2263 Returns: True if permissions can be trusted, False otherwise. 

2264 """ 

2265 return sys.platform != "win32" 

2266 

2267 def _determine_symlinks(self) -> bool: 

2268 """Probe the file-system to determine whether permissions can be trusted. 

2269 

2270 Returns: True if permissions can be trusted, False otherwise. 

2271 """ 

2272 return sys.platform != "win32" 

2273 

2274 def _put_named_file(self, path: str, contents: bytes) -> None: 

2275 """Write a file to the control dir with the given name and contents. 

2276 

2277 Args: 

2278 path: The path to the file, relative to the control dir. 

2279 contents: A string to write to the file. 

2280 """ 

2281 self._named_files[path] = contents 

2282 

2283 def _del_named_file(self, path: str) -> None: 

2284 try: 

2285 del self._named_files[path] 

2286 except KeyError: 

2287 pass 

2288 

2289 def get_named_file( 

2290 self, 

2291 path: Union[str, bytes], 

2292 basedir: Optional[str] = None, 

2293 ) -> Optional[BytesIO]: 

2294 """Get a file from the control dir with a specific name. 

2295 

2296 Although the filename should be interpreted as a filename relative to 

2297 the control dir in a disk-baked Repo, the object returned need not be 

2298 pointing to a file in that location. 

2299 

2300 Args: 

2301 path: The path to the file, relative to the control dir. 

2302 basedir: Optional base directory for the path 

2303 Returns: An open file object, or None if the file does not exist. 

2304 """ 

2305 path_str = path.decode() if isinstance(path, bytes) else path 

2306 contents = self._named_files.get(path_str, None) 

2307 if contents is None: 

2308 return None 

2309 return BytesIO(contents) 

2310 

2311 def open_index(self) -> "Index": 

2312 """Fail to open index for this repo, since it is bare. 

2313 

2314 Raises: 

2315 NoIndexPresent: Raised when no index is present 

2316 """ 

2317 raise NoIndexPresent 

2318 

2319 def get_config(self) -> "ConfigFile": 

2320 """Retrieve the config object. 

2321 

2322 Returns: `ConfigFile` object. 

2323 """ 

2324 return self._config 

2325 

2326 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2327 """Get the appropriate rebase state manager for this repository. 

2328 

2329 Returns: MemoryRebaseStateManager instance 

2330 """ 

2331 from .rebase import MemoryRebaseStateManager 

2332 

2333 return MemoryRebaseStateManager(self) 

2334 

2335 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2336 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2337 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2338 

2339 # Get fresh configuration and GitAttributes 

2340 config_stack = self.get_config_stack() 

2341 git_attributes = self.get_gitattributes() 

2342 

2343 # Lazily create FilterContext if needed 

2344 if self.filter_context is None: 

2345 filter_registry = FilterRegistry(config_stack, self) 

2346 self.filter_context = FilterContext(filter_registry) 

2347 else: 

2348 # Refresh the context with current config to handle config changes 

2349 self.filter_context.refresh_config(config_stack) 

2350 

2351 # Return a new FilterBlobNormalizer with the context 

2352 return FilterBlobNormalizer( 

2353 config_stack, git_attributes, filter_context=self.filter_context 

2354 ) 

2355 

2356 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2357 """Read gitattributes for the repository.""" 

2358 from .attrs import GitAttributes 

2359 

2360 # Memory repos don't have working trees or gitattributes files 

2361 # Return empty GitAttributes 

2362 return GitAttributes([]) 

2363 

2364 def close(self) -> None: 

2365 """Close any resources opened by this repository.""" 

2366 # Clean up filter context if it was created 

2367 if self.filter_context is not None: 

2368 self.filter_context.close() 

2369 self.filter_context = None 

2370 

2371 def do_commit( 

2372 self, 

2373 message: Optional[bytes] = None, 

2374 committer: Optional[bytes] = None, 

2375 author: Optional[bytes] = None, 

2376 commit_timestamp: Optional[float] = None, 

2377 commit_timezone: Optional[int] = None, 

2378 author_timestamp: Optional[float] = None, 

2379 author_timezone: Optional[int] = None, 

2380 tree: Optional[ObjectID] = None, 

2381 encoding: Optional[bytes] = None, 

2382 ref: Optional[Ref] = b"HEAD", 

2383 merge_heads: Optional[list[ObjectID]] = None, 

2384 no_verify: bool = False, 

2385 sign: bool = False, 

2386 ) -> bytes: 

2387 """Create a new commit. 

2388 

2389 This is a simplified implementation for in-memory repositories that 

2390 doesn't support worktree operations or hooks. 

2391 

2392 Args: 

2393 message: Commit message 

2394 committer: Committer fullname 

2395 author: Author fullname 

2396 commit_timestamp: Commit timestamp (defaults to now) 

2397 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2398 author_timestamp: Author timestamp (defaults to commit timestamp) 

2399 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2400 tree: SHA1 of the tree root to use 

2401 encoding: Encoding 

2402 ref: Optional ref to commit to (defaults to current branch). 

2403 If None, creates a dangling commit without updating any ref. 

2404 merge_heads: Merge heads 

2405 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2406 sign: GPG Sign the commit (ignored for MemoryRepo) 

2407 

2408 Returns: 

2409 New commit SHA1 

2410 """ 

2411 import time 

2412 

2413 from .objects import Commit 

2414 

2415 if tree is None: 

2416 raise ValueError("tree must be specified for MemoryRepo") 

2417 

2418 c = Commit() 

2419 if len(tree) != 40: 

2420 raise ValueError("tree must be a 40-byte hex sha string") 

2421 c.tree = tree 

2422 

2423 config = self.get_config_stack() 

2424 if merge_heads is None: 

2425 merge_heads = [] 

2426 if committer is None: 

2427 committer = get_user_identity(config, kind="COMMITTER") 

2428 check_user_identity(committer) 

2429 c.committer = committer 

2430 if commit_timestamp is None: 

2431 commit_timestamp = time.time() 

2432 c.commit_time = int(commit_timestamp) 

2433 if commit_timezone is None: 

2434 commit_timezone = 0 

2435 c.commit_timezone = commit_timezone 

2436 if author is None: 

2437 author = get_user_identity(config, kind="AUTHOR") 

2438 c.author = author 

2439 check_user_identity(author) 

2440 if author_timestamp is None: 

2441 author_timestamp = commit_timestamp 

2442 c.author_time = int(author_timestamp) 

2443 if author_timezone is None: 

2444 author_timezone = commit_timezone 

2445 c.author_timezone = author_timezone 

2446 if encoding is None: 

2447 try: 

2448 encoding = config.get(("i18n",), "commitEncoding") 

2449 except KeyError: 

2450 pass 

2451 if encoding is not None: 

2452 c.encoding = encoding 

2453 

2454 # Handle message (for MemoryRepo, we don't support callable messages) 

2455 if callable(message): 

2456 message = message(self, c) 

2457 if message is None: 

2458 raise ValueError("Message callback returned None") 

2459 

2460 if message is None: 

2461 raise ValueError("No commit message specified") 

2462 

2463 c.message = message 

2464 

2465 if ref is None: 

2466 # Create a dangling commit 

2467 c.parents = merge_heads 

2468 self.object_store.add_object(c) 

2469 else: 

2470 try: 

2471 old_head = self.refs[ref] 

2472 c.parents = [old_head, *merge_heads] 

2473 self.object_store.add_object(c) 

2474 ok = self.refs.set_if_equals( 

2475 ref, 

2476 old_head, 

2477 c.id, 

2478 message=b"commit: " + message, 

2479 committer=committer, 

2480 timestamp=int(commit_timestamp), 

2481 timezone=commit_timezone, 

2482 ) 

2483 except KeyError: 

2484 c.parents = merge_heads 

2485 self.object_store.add_object(c) 

2486 ok = self.refs.add_if_new( 

2487 ref, 

2488 c.id, 

2489 message=b"commit: " + message, 

2490 committer=committer, 

2491 timestamp=int(commit_timestamp), 

2492 timezone=commit_timezone, 

2493 ) 

2494 if not ok: 

2495 from .errors import CommitError 

2496 

2497 raise CommitError(f"{ref!r} changed during commit") 

2498 

2499 return c.id 

2500 

2501 @classmethod 

2502 def init_bare( 

2503 cls, 

2504 objects: Iterable[ShaFile], 

2505 refs: Mapping[bytes, bytes], 

2506 format: Optional[int] = None, 

2507 ) -> "MemoryRepo": 

2508 """Create a new bare repository in memory. 

2509 

2510 Args: 

2511 objects: Objects for the new repository, 

2512 as iterable 

2513 refs: Refs as dictionary, mapping names 

2514 to object SHA1s 

2515 format: Repository format version (defaults to 0) 

2516 """ 

2517 ret = cls() 

2518 for obj in objects: 

2519 ret.object_store.add_object(obj) 

2520 for refname, sha in refs.items(): 

2521 ret.refs.add_if_new(refname, sha) 

2522 ret._init_files(bare=True, format=format) 

2523 return ret