Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

990 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable, Iterator 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 TypeVar, 

46 Union, 

47) 

48 

49if TYPE_CHECKING: 

50 # There are no circular imports here, but we try to defer imports as long 

51 # as possible to reduce start-up time for anything that doesn't need 

52 # these imports. 

53 from .attrs import GitAttributes 

54 from .config import ConditionMatcher, ConfigFile, StackedConfig 

55 from .index import Index 

56 from .line_ending import BlobNormalizer 

57 from .notes import Notes 

58 from .object_store import BaseObjectStore, GraphWalker, UnpackedObject 

59 from .rebase import RebaseStateManager 

60 from .walk import Walker 

61 from .worktree import WorkTree 

62 

63from . import replace_me 

64from .errors import ( 

65 NoIndexPresent, 

66 NotBlobError, 

67 NotCommitError, 

68 NotGitRepository, 

69 NotTagError, 

70 NotTreeError, 

71 RefFormatError, 

72) 

73from .file import GitFile 

74from .hooks import ( 

75 CommitMsgShellHook, 

76 Hook, 

77 PostCommitShellHook, 

78 PostReceiveShellHook, 

79 PreCommitShellHook, 

80) 

81from .object_store import ( 

82 DiskObjectStore, 

83 MemoryObjectStore, 

84 MissingObjectFinder, 

85 ObjectStoreGraphWalker, 

86 PackBasedObjectStore, 

87 find_shallow, 

88 peel_sha, 

89) 

90from .objects import ( 

91 Blob, 

92 Commit, 

93 ObjectID, 

94 ShaFile, 

95 Tag, 

96 Tree, 

97 check_hexsha, 

98 valid_hexsha, 

99) 

100from .pack import generate_unpacked_objects 

101from .refs import ( 

102 ANNOTATED_TAG_SUFFIX, # noqa: F401 

103 LOCAL_BRANCH_PREFIX, 

104 LOCAL_TAG_PREFIX, # noqa: F401 

105 SYMREF, # noqa: F401 

106 DictRefsContainer, 

107 DiskRefsContainer, 

108 InfoRefsContainer, # noqa: F401 

109 Ref, 

110 RefsContainer, 

111 _set_default_branch, 

112 _set_head, 

113 _set_origin_head, 

114 check_ref_format, # noqa: F401 

115 is_per_worktree_ref, 

116 read_packed_refs, # noqa: F401 

117 read_packed_refs_with_peeled, # noqa: F401 

118 serialize_refs, 

119 write_packed_refs, # noqa: F401 

120) 

121 

122CONTROLDIR = ".git" 

123OBJECTDIR = "objects" 

124 

125T = TypeVar("T", bound="ShaFile") 

126REFSDIR = "refs" 

127REFSDIR_TAGS = "tags" 

128REFSDIR_HEADS = "heads" 

129INDEX_FILENAME = "index" 

130COMMONDIR = "commondir" 

131GITDIR = "gitdir" 

132WORKTREES = "worktrees" 

133 

134BASE_DIRECTORIES = [ 

135 ["branches"], 

136 [REFSDIR], 

137 [REFSDIR, REFSDIR_TAGS], 

138 [REFSDIR, REFSDIR_HEADS], 

139 ["hooks"], 

140 ["info"], 

141] 

142 

143DEFAULT_BRANCH = b"master" 

144 

145 

146class InvalidUserIdentity(Exception): 

147 """User identity is not of the format 'user <email>'.""" 

148 

149 def __init__(self, identity: str) -> None: 

150 """Initialize InvalidUserIdentity exception.""" 

151 self.identity = identity 

152 

153 

154class DefaultIdentityNotFound(Exception): 

155 """Default identity could not be determined.""" 

156 

157 

158# TODO(jelmer): Cache? 

159def _get_default_identity() -> tuple[str, str]: 

160 import socket 

161 

162 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

163 username = os.environ.get(name) 

164 if username: 

165 break 

166 else: 

167 username = None 

168 

169 try: 

170 import pwd 

171 except ImportError: 

172 fullname = None 

173 else: 

174 try: 

175 entry = pwd.getpwuid(os.getuid()) # type: ignore 

176 except KeyError: 

177 fullname = None 

178 else: 

179 if getattr(entry, "gecos", None): 

180 fullname = entry.pw_gecos.split(",")[0] 

181 else: 

182 fullname = None 

183 if username is None: 

184 username = entry.pw_name 

185 if not fullname: 

186 if username is None: 

187 raise DefaultIdentityNotFound("no username found") 

188 fullname = username 

189 email = os.environ.get("EMAIL") 

190 if email is None: 

191 if username is None: 

192 raise DefaultIdentityNotFound("no username found") 

193 email = f"{username}@{socket.gethostname()}" 

194 return (fullname, email) 

195 

196 

197def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

198 """Determine the identity to use for new commits. 

199 

200 If kind is set, this first checks 

201 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

202 

203 If those variables are not set, then it will fall back 

204 to reading the user.name and user.email settings from 

205 the specified configuration. 

206 

207 If that also fails, then it will fall back to using 

208 the current users' identity as obtained from the host 

209 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

210 

211 Args: 

212 config: Configuration stack to read from 

213 kind: Optional kind to return identity for, 

214 usually either "AUTHOR" or "COMMITTER". 

215 

216 Returns: 

217 A user identity 

218 """ 

219 user: Optional[bytes] = None 

220 email: Optional[bytes] = None 

221 if kind: 

222 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

223 if user_uc is not None: 

224 user = user_uc.encode("utf-8") 

225 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

226 if email_uc is not None: 

227 email = email_uc.encode("utf-8") 

228 if user is None: 

229 try: 

230 user = config.get(("user",), "name") 

231 except KeyError: 

232 user = None 

233 if email is None: 

234 try: 

235 email = config.get(("user",), "email") 

236 except KeyError: 

237 email = None 

238 default_user, default_email = _get_default_identity() 

239 if user is None: 

240 user = default_user.encode("utf-8") 

241 if email is None: 

242 email = default_email.encode("utf-8") 

243 if email.startswith(b"<") and email.endswith(b">"): 

244 email = email[1:-1] 

245 return user + b" <" + email + b">" 

246 

247 

248def check_user_identity(identity: bytes) -> None: 

249 """Verify that a user identity is formatted correctly. 

250 

251 Args: 

252 identity: User identity bytestring 

253 Raises: 

254 InvalidUserIdentity: Raised when identity is invalid 

255 """ 

256 try: 

257 fst, snd = identity.split(b" <", 1) 

258 except ValueError as exc: 

259 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

260 if b">" not in snd: 

261 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

262 if b"\0" in identity or b"\n" in identity: 

263 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

264 

265 

266def parse_graftpoints( 

267 graftpoints: Iterable[bytes], 

268) -> dict[bytes, list[bytes]]: 

269 """Convert a list of graftpoints into a dict. 

270 

271 Args: 

272 graftpoints: Iterator of graftpoint lines 

273 

274 Each line is formatted as: 

275 <commit sha1> <parent sha1> [<parent sha1>]* 

276 

277 Resulting dictionary is: 

278 <commit sha1>: [<parent sha1>*] 

279 

280 https://git.wiki.kernel.org/index.php/GraftPoint 

281 """ 

282 grafts = {} 

283 for line in graftpoints: 

284 raw_graft = line.split(None, 1) 

285 

286 commit = raw_graft[0] 

287 if len(raw_graft) == 2: 

288 parents = raw_graft[1].split() 

289 else: 

290 parents = [] 

291 

292 for sha in [commit, *parents]: 

293 check_hexsha(sha, "Invalid graftpoint") 

294 

295 grafts[commit] = parents 

296 return grafts 

297 

298 

299def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

300 """Convert a dictionary of grafts into string. 

301 

302 The graft dictionary is: 

303 <commit sha1>: [<parent sha1>*] 

304 

305 Each line is formatted as: 

306 <commit sha1> <parent sha1> [<parent sha1>]* 

307 

308 https://git.wiki.kernel.org/index.php/GraftPoint 

309 

310 """ 

311 graft_lines = [] 

312 for commit, parents in graftpoints.items(): 

313 if parents: 

314 graft_lines.append(commit + b" " + b" ".join(parents)) 

315 else: 

316 graft_lines.append(commit) 

317 return b"\n".join(graft_lines) 

318 

319 

320def _set_filesystem_hidden(path: str) -> None: 

321 """Mark path as to be hidden if supported by platform and filesystem. 

322 

323 On win32 uses SetFileAttributesW api: 

324 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

325 """ 

326 if sys.platform == "win32": 

327 import ctypes 

328 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

329 

330 FILE_ATTRIBUTE_HIDDEN = 2 

331 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

332 ("SetFileAttributesW", ctypes.windll.kernel32) 

333 ) 

334 

335 if isinstance(path, bytes): 

336 path = os.fsdecode(path) 

337 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

338 pass # Could raise or log `ctypes.WinError()` here 

339 

340 # Could implement other platform specific filesystem hiding here 

341 

342 

343class ParentsProvider: 

344 """Provider for commit parent information.""" 

345 

346 def __init__( 

347 self, 

348 store: "BaseObjectStore", 

349 grafts: dict = {}, 

350 shallows: Iterable[bytes] = [], 

351 ) -> None: 

352 """Initialize ParentsProvider. 

353 

354 Args: 

355 store: Object store to use 

356 grafts: Graft information 

357 shallows: Shallow commit SHAs 

358 """ 

359 self.store = store 

360 self.grafts = grafts 

361 self.shallows = set(shallows) 

362 

363 # Get commit graph once at initialization for performance 

364 self.commit_graph = store.get_commit_graph() 

365 

366 def get_parents( 

367 self, commit_id: bytes, commit: Optional[Commit] = None 

368 ) -> list[bytes]: 

369 """Get parents for a commit using the parents provider.""" 

370 try: 

371 return self.grafts[commit_id] 

372 except KeyError: 

373 pass 

374 if commit_id in self.shallows: 

375 return [] 

376 

377 # Try to use commit graph for faster parent lookup 

378 if self.commit_graph: 

379 parents = self.commit_graph.get_parents(commit_id) 

380 if parents is not None: 

381 return parents 

382 

383 # Fallback to reading the commit object 

384 if commit is None: 

385 obj = self.store[commit_id] 

386 assert isinstance(obj, Commit) 

387 commit = obj 

388 return commit.parents 

389 

390 

391class BaseRepo: 

392 """Base class for a git repository. 

393 

394 This base class is meant to be used for Repository implementations that e.g. 

395 work on top of a different transport than a standard filesystem path. 

396 

397 Attributes: 

398 object_store: Dictionary-like object for accessing 

399 the objects 

400 refs: Dictionary-like object with the refs in this 

401 repository 

402 """ 

403 

404 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

405 """Open a repository. 

406 

407 This shouldn't be called directly, but rather through one of the 

408 base classes, such as MemoryRepo or Repo. 

409 

410 Args: 

411 object_store: Object store to use 

412 refs: Refs container to use 

413 """ 

414 self.object_store = object_store 

415 self.refs = refs 

416 

417 self._graftpoints: dict[bytes, list[bytes]] = {} 

418 self.hooks: dict[str, Hook] = {} 

419 

420 def _determine_file_mode(self) -> bool: 

421 """Probe the file-system to determine whether permissions can be trusted. 

422 

423 Returns: True if permissions can be trusted, False otherwise. 

424 """ 

425 raise NotImplementedError(self._determine_file_mode) 

426 

427 def _determine_symlinks(self) -> bool: 

428 """Probe the filesystem to determine whether symlinks can be created. 

429 

430 Returns: True if symlinks can be created, False otherwise. 

431 """ 

432 # For now, just mimic the old behaviour 

433 return sys.platform != "win32" 

434 

435 def _init_files( 

436 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

437 ) -> None: 

438 """Initialize a default set of named files.""" 

439 from .config import ConfigFile 

440 

441 self._put_named_file("description", b"Unnamed repository") 

442 f = BytesIO() 

443 cf = ConfigFile() 

444 if format is None: 

445 format = 0 

446 if format not in (0, 1): 

447 raise ValueError(f"Unsupported repository format version: {format}") 

448 cf.set("core", "repositoryformatversion", str(format)) 

449 if self._determine_file_mode(): 

450 cf.set("core", "filemode", True) 

451 else: 

452 cf.set("core", "filemode", False) 

453 

454 if symlinks is None and not bare: 

455 symlinks = self._determine_symlinks() 

456 

457 if symlinks is False: 

458 cf.set("core", "symlinks", symlinks) 

459 

460 cf.set("core", "bare", bare) 

461 cf.set("core", "logallrefupdates", True) 

462 cf.write_to_file(f) 

463 self._put_named_file("config", f.getvalue()) 

464 self._put_named_file(os.path.join("info", "exclude"), b"") 

465 

466 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

467 """Get a file from the control dir with a specific name. 

468 

469 Although the filename should be interpreted as a filename relative to 

470 the control dir in a disk-based Repo, the object returned need not be 

471 pointing to a file in that location. 

472 

473 Args: 

474 path: The path to the file, relative to the control dir. 

475 Returns: An open file object, or None if the file does not exist. 

476 """ 

477 raise NotImplementedError(self.get_named_file) 

478 

479 def _put_named_file(self, path: str, contents: bytes) -> None: 

480 """Write a file to the control dir with the given name and contents. 

481 

482 Args: 

483 path: The path to the file, relative to the control dir. 

484 contents: A string to write to the file. 

485 """ 

486 raise NotImplementedError(self._put_named_file) 

487 

488 def _del_named_file(self, path: str) -> None: 

489 """Delete a file in the control directory with the given name.""" 

490 raise NotImplementedError(self._del_named_file) 

491 

492 def open_index(self) -> "Index": 

493 """Open the index for this repository. 

494 

495 Raises: 

496 NoIndexPresent: If no index is present 

497 Returns: The matching `Index` 

498 """ 

499 raise NotImplementedError(self.open_index) 

500 

501 def fetch( 

502 self, 

503 target: "BaseRepo", 

504 determine_wants: Optional[Callable] = None, 

505 progress: Optional[Callable] = None, 

506 depth: Optional[int] = None, 

507 ) -> dict: 

508 """Fetch objects into another repository. 

509 

510 Args: 

511 target: The target repository 

512 determine_wants: Optional function to determine what refs to 

513 fetch. 

514 progress: Optional progress function 

515 depth: Optional shallow fetch depth 

516 Returns: The local refs 

517 """ 

518 if determine_wants is None: 

519 determine_wants = target.object_store.determine_wants_all 

520 count, pack_data = self.fetch_pack_data( 

521 determine_wants, 

522 target.get_graph_walker(), 

523 progress=progress, 

524 depth=depth, 

525 ) 

526 target.object_store.add_pack_data(count, pack_data, progress) 

527 return self.get_refs() 

528 

529 def fetch_pack_data( 

530 self, 

531 determine_wants: Callable, 

532 graph_walker: "GraphWalker", 

533 progress: Optional[Callable], 

534 *, 

535 get_tagged: Optional[Callable] = None, 

536 depth: Optional[int] = None, 

537 ) -> tuple: 

538 """Fetch the pack data required for a set of revisions. 

539 

540 Args: 

541 determine_wants: Function that takes a dictionary with heads 

542 and returns the list of heads to fetch. 

543 graph_walker: Object that can iterate over the list of revisions 

544 to fetch and has an "ack" method that will be called to acknowledge 

545 that a revision is present. 

546 progress: Simple progress function that will be called with 

547 updated progress strings. 

548 get_tagged: Function that returns a dict of pointed-to sha -> 

549 tag sha for including tags. 

550 depth: Shallow fetch depth 

551 Returns: count and iterator over pack data 

552 """ 

553 missing_objects = self.find_missing_objects( 

554 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

555 ) 

556 if missing_objects is None: 

557 return 0, iter([]) 

558 remote_has = missing_objects.get_remote_has() 

559 object_ids = list(missing_objects) 

560 return len(object_ids), generate_unpacked_objects( 

561 self.object_store, object_ids, progress=progress, other_haves=remote_has 

562 ) 

563 

564 def find_missing_objects( 

565 self, 

566 determine_wants: Callable, 

567 graph_walker: "GraphWalker", 

568 progress: Optional[Callable], 

569 *, 

570 get_tagged: Optional[Callable] = None, 

571 depth: Optional[int] = None, 

572 ) -> Optional[MissingObjectFinder]: 

573 """Fetch the missing objects required for a set of revisions. 

574 

575 Args: 

576 determine_wants: Function that takes a dictionary with heads 

577 and returns the list of heads to fetch. 

578 graph_walker: Object that can iterate over the list of revisions 

579 to fetch and has an "ack" method that will be called to acknowledge 

580 that a revision is present. 

581 progress: Simple progress function that will be called with 

582 updated progress strings. 

583 get_tagged: Function that returns a dict of pointed-to sha -> 

584 tag sha for including tags. 

585 depth: Shallow fetch depth 

586 Returns: iterator over objects, with __len__ implemented 

587 """ 

588 refs = serialize_refs(self.object_store, self.get_refs()) 

589 

590 wants = determine_wants(refs) 

591 if not isinstance(wants, list): 

592 raise TypeError("determine_wants() did not return a list") 

593 

594 current_shallow = set(getattr(graph_walker, "shallow", set())) 

595 

596 if depth not in (None, 0): 

597 assert depth is not None 

598 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

599 # Only update if graph_walker has shallow attribute 

600 if hasattr(graph_walker, "shallow"): 

601 graph_walker.shallow.update(shallow - not_shallow) 

602 new_shallow = graph_walker.shallow - current_shallow 

603 unshallow = graph_walker.unshallow = not_shallow & current_shallow # type: ignore[attr-defined] 

604 if hasattr(graph_walker, "update_shallow"): 

605 graph_walker.update_shallow(new_shallow, unshallow) 

606 else: 

607 unshallow = getattr(graph_walker, "unshallow", set()) 

608 

609 if wants == []: 

610 # TODO(dborowitz): find a way to short-circuit that doesn't change 

611 # this interface. 

612 

613 if getattr(graph_walker, "shallow", set()) or unshallow: 

614 # Do not send a pack in shallow short-circuit path 

615 return None 

616 

617 class DummyMissingObjectFinder: 

618 """Dummy finder that returns no missing objects.""" 

619 

620 def get_remote_has(self) -> None: 

621 """Get remote has (always returns None). 

622 

623 Returns: 

624 None 

625 """ 

626 return None 

627 

628 def __len__(self) -> int: 

629 return 0 

630 

631 def __iter__(self) -> Iterator[tuple[bytes, Optional[bytes]]]: 

632 yield from [] 

633 

634 return DummyMissingObjectFinder() # type: ignore 

635 

636 # If the graph walker is set up with an implementation that can 

637 # ACK/NAK to the wire, it will write data to the client through 

638 # this call as a side-effect. 

639 haves = self.object_store.find_common_revisions(graph_walker) 

640 

641 # Deal with shallow requests separately because the haves do 

642 # not reflect what objects are missing 

643 if getattr(graph_walker, "shallow", set()) or unshallow: 

644 # TODO: filter the haves commits from iter_shas. the specific 

645 # commits aren't missing. 

646 haves = [] 

647 

648 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

649 

650 def get_parents(commit: Commit) -> list[bytes]: 

651 """Get parents for a commit using the parents provider. 

652 

653 Args: 

654 commit: Commit object 

655 

656 Returns: 

657 List of parent commit SHAs 

658 """ 

659 return parents_provider.get_parents(commit.id, commit) 

660 

661 return MissingObjectFinder( 

662 self.object_store, 

663 haves=haves, 

664 wants=wants, 

665 shallow=getattr(graph_walker, "shallow", set()), 

666 progress=progress, 

667 get_tagged=get_tagged, 

668 get_parents=get_parents, 

669 ) 

670 

671 def generate_pack_data( 

672 self, 

673 have: Iterable[ObjectID], 

674 want: Iterable[ObjectID], 

675 progress: Optional[Callable[[str], None]] = None, 

676 ofs_delta: Optional[bool] = None, 

677 ) -> tuple[int, Iterator["UnpackedObject"]]: 

678 """Generate pack data objects for a set of wants/haves. 

679 

680 Args: 

681 have: List of SHA1s of objects that should not be sent 

682 want: List of SHA1s of objects that should be sent 

683 ofs_delta: Whether OFS deltas can be included 

684 progress: Optional progress reporting method 

685 """ 

686 return self.object_store.generate_pack_data( 

687 have, 

688 want, 

689 shallow=self.get_shallow(), 

690 progress=progress, 

691 ofs_delta=ofs_delta, 

692 ) 

693 

694 def get_graph_walker( 

695 self, heads: Optional[list[ObjectID]] = None 

696 ) -> ObjectStoreGraphWalker: 

697 """Retrieve a graph walker. 

698 

699 A graph walker is used by a remote repository (or proxy) 

700 to find out which objects are present in this repository. 

701 

702 Args: 

703 heads: Repository heads to use (optional) 

704 Returns: A graph walker object 

705 """ 

706 if heads is None: 

707 heads = [ 

708 sha 

709 for sha in self.refs.as_dict(b"refs/heads").values() 

710 if sha in self.object_store 

711 ] 

712 parents_provider = ParentsProvider(self.object_store) 

713 return ObjectStoreGraphWalker( 

714 heads, 

715 parents_provider.get_parents, 

716 shallow=self.get_shallow(), 

717 update_shallow=self.update_shallow, 

718 ) 

719 

720 def get_refs(self) -> dict[bytes, bytes]: 

721 """Get dictionary with all refs. 

722 

723 Returns: A ``dict`` mapping ref names to SHA1s 

724 """ 

725 return self.refs.as_dict() 

726 

727 def head(self) -> bytes: 

728 """Return the SHA1 pointed at by HEAD.""" 

729 # TODO: move this method to WorkTree 

730 return self.refs[b"HEAD"] 

731 

732 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

733 assert len(sha) in (20, 40) 

734 ret = self.get_object(sha) 

735 if not isinstance(ret, cls): 

736 if cls is Commit: 

737 raise NotCommitError(ret.id) 

738 elif cls is Blob: 

739 raise NotBlobError(ret.id) 

740 elif cls is Tree: 

741 raise NotTreeError(ret.id) 

742 elif cls is Tag: 

743 raise NotTagError(ret.id) 

744 else: 

745 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

746 return ret 

747 

748 def get_object(self, sha: bytes) -> ShaFile: 

749 """Retrieve the object with the specified SHA. 

750 

751 Args: 

752 sha: SHA to retrieve 

753 Returns: A ShaFile object 

754 Raises: 

755 KeyError: when the object can not be found 

756 """ 

757 return self.object_store[sha] 

758 

759 def parents_provider(self) -> ParentsProvider: 

760 """Get a parents provider for this repository. 

761 

762 Returns: 

763 ParentsProvider instance configured with grafts and shallows 

764 """ 

765 return ParentsProvider( 

766 self.object_store, 

767 grafts=self._graftpoints, 

768 shallows=self.get_shallow(), 

769 ) 

770 

771 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

772 """Retrieve the parents of a specific commit. 

773 

774 If the specific commit is a graftpoint, the graft parents 

775 will be returned instead. 

776 

777 Args: 

778 sha: SHA of the commit for which to retrieve the parents 

779 commit: Optional commit matching the sha 

780 Returns: List of parents 

781 """ 

782 return self.parents_provider().get_parents(sha, commit) 

783 

784 def get_config(self) -> "ConfigFile": 

785 """Retrieve the config object. 

786 

787 Returns: `ConfigFile` object for the ``.git/config`` file. 

788 """ 

789 raise NotImplementedError(self.get_config) 

790 

791 def get_worktree_config(self) -> "ConfigFile": 

792 """Retrieve the worktree config object.""" 

793 raise NotImplementedError(self.get_worktree_config) 

794 

795 def get_description(self) -> Optional[str]: 

796 """Retrieve the description for this repository. 

797 

798 Returns: String with the description of the repository 

799 as set by the user. 

800 """ 

801 raise NotImplementedError(self.get_description) 

802 

803 def set_description(self, description: bytes) -> None: 

804 """Set the description for this repository. 

805 

806 Args: 

807 description: Text to set as description for this repository. 

808 """ 

809 raise NotImplementedError(self.set_description) 

810 

811 def get_rebase_state_manager(self) -> "RebaseStateManager": 

812 """Get the appropriate rebase state manager for this repository. 

813 

814 Returns: RebaseStateManager instance 

815 """ 

816 raise NotImplementedError(self.get_rebase_state_manager) 

817 

818 def get_blob_normalizer(self) -> "BlobNormalizer": 

819 """Return a BlobNormalizer object for checkin/checkout operations. 

820 

821 Returns: BlobNormalizer instance 

822 """ 

823 raise NotImplementedError(self.get_blob_normalizer) 

824 

825 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

826 """Read gitattributes for the repository. 

827 

828 Args: 

829 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

830 

831 Returns: 

832 GitAttributes object that can be used to match paths 

833 """ 

834 raise NotImplementedError(self.get_gitattributes) 

835 

836 def get_config_stack(self) -> "StackedConfig": 

837 """Return a config stack for this repository. 

838 

839 This stack accesses the configuration for both this repository 

840 itself (.git/config) and the global configuration, which usually 

841 lives in ~/.gitconfig. 

842 

843 Returns: `Config` instance for this repository 

844 """ 

845 from .config import ConfigFile, StackedConfig 

846 

847 local_config = self.get_config() 

848 backends: list[ConfigFile] = [local_config] 

849 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

850 backends.append(self.get_worktree_config()) 

851 

852 backends += StackedConfig.default_backends() 

853 return StackedConfig(backends, writable=local_config) 

854 

855 def get_shallow(self) -> set[ObjectID]: 

856 """Get the set of shallow commits. 

857 

858 Returns: Set of shallow commits. 

859 """ 

860 f = self.get_named_file("shallow") 

861 if f is None: 

862 return set() 

863 with f: 

864 return {line.strip() for line in f} 

865 

866 def update_shallow( 

867 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]] 

868 ) -> None: 

869 """Update the list of shallow objects. 

870 

871 Args: 

872 new_shallow: Newly shallow objects 

873 new_unshallow: Newly no longer shallow objects 

874 """ 

875 shallow = self.get_shallow() 

876 if new_shallow: 

877 shallow.update(new_shallow) 

878 if new_unshallow: 

879 shallow.difference_update(new_unshallow) 

880 if shallow: 

881 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

882 else: 

883 self._del_named_file("shallow") 

884 

885 def get_peeled(self, ref: Ref) -> ObjectID: 

886 """Get the peeled value of a ref. 

887 

888 Args: 

889 ref: The refname to peel. 

890 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

891 intermediate tags; if the original ref does not point to a tag, 

892 this will equal the original SHA1. 

893 """ 

894 cached = self.refs.get_peeled(ref) 

895 if cached is not None: 

896 return cached 

897 return peel_sha(self.object_store, self.refs[ref])[1].id 

898 

899 @property 

900 def notes(self) -> "Notes": 

901 """Access notes functionality for this repository. 

902 

903 Returns: 

904 Notes object for accessing notes 

905 """ 

906 from .notes import Notes 

907 

908 return Notes(self.object_store, self.refs) 

909 

910 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs) -> "Walker": 

911 """Obtain a walker for this repository. 

912 

913 Args: 

914 include: Iterable of SHAs of commits to include along with their 

915 ancestors. Defaults to [HEAD] 

916 **kwargs: Additional keyword arguments including: 

917 

918 * exclude: Iterable of SHAs of commits to exclude along with their 

919 ancestors, overriding includes. 

920 * order: ORDER_* constant specifying the order of results. 

921 Anything other than ORDER_DATE may result in O(n) memory usage. 

922 * reverse: If True, reverse the order of output, requiring O(n) 

923 memory. 

924 * max_entries: The maximum number of entries to yield, or None for 

925 no limit. 

926 * paths: Iterable of file or subtree paths to show entries for. 

927 * rename_detector: diff.RenameDetector object for detecting 

928 renames. 

929 * follow: If True, follow path across renames/copies. Forces a 

930 default rename_detector. 

931 * since: Timestamp to list commits after. 

932 * until: Timestamp to list commits before. 

933 * queue_cls: A class to use for a queue of commits, supporting the 

934 iterator protocol. The constructor takes a single argument, the Walker. 

935 

936 Returns: A `Walker` object 

937 """ 

938 from .walk import Walker 

939 

940 if include is None: 

941 include = [self.head()] 

942 

943 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

944 

945 return Walker(self.object_store, include, **kwargs) 

946 

947 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile": 

948 """Retrieve a Git object by SHA1 or ref. 

949 

950 Args: 

951 name: A Git object SHA1 or a ref name 

952 Returns: A `ShaFile` object, such as a Commit or Blob 

953 Raises: 

954 KeyError: when the specified ref or object does not exist 

955 """ 

956 if not isinstance(name, bytes): 

957 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

958 if len(name) in (20, 40): 

959 try: 

960 return self.object_store[name] 

961 except (KeyError, ValueError): 

962 pass 

963 try: 

964 return self.object_store[self.refs[name]] 

965 except RefFormatError as exc: 

966 raise KeyError(name) from exc 

967 

968 def __contains__(self, name: bytes) -> bool: 

969 """Check if a specific Git object or ref is present. 

970 

971 Args: 

972 name: Git object SHA1 or ref name 

973 """ 

974 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

975 return name in self.object_store or name in self.refs 

976 else: 

977 return name in self.refs 

978 

979 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

980 """Set a ref. 

981 

982 Args: 

983 name: ref name 

984 value: Ref value - either a ShaFile object, or a hex sha 

985 """ 

986 if name.startswith(b"refs/") or name == b"HEAD": 

987 if isinstance(value, ShaFile): 

988 self.refs[name] = value.id 

989 elif isinstance(value, bytes): 

990 self.refs[name] = value 

991 else: 

992 raise TypeError(value) 

993 else: 

994 raise ValueError(name) 

995 

996 def __delitem__(self, name: bytes) -> None: 

997 """Remove a ref. 

998 

999 Args: 

1000 name: Name of the ref to remove 

1001 """ 

1002 if name.startswith(b"refs/") or name == b"HEAD": 

1003 del self.refs[name] 

1004 else: 

1005 raise ValueError(name) 

1006 

1007 def _get_user_identity( 

1008 self, config: "StackedConfig", kind: Optional[str] = None 

1009 ) -> bytes: 

1010 """Determine the identity to use for new commits.""" 

1011 warnings.warn( 

1012 "use get_user_identity() rather than Repo._get_user_identity", 

1013 DeprecationWarning, 

1014 ) 

1015 return get_user_identity(config) 

1016 

1017 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1018 """Add or modify graftpoints. 

1019 

1020 Args: 

1021 updated_graftpoints: Dict of commit shas to list of parent shas 

1022 """ 

1023 # Simple validation 

1024 for commit, parents in updated_graftpoints.items(): 

1025 for sha in [commit, *parents]: 

1026 check_hexsha(sha, "Invalid graftpoint") 

1027 

1028 self._graftpoints.update(updated_graftpoints) 

1029 

1030 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

1031 """Remove graftpoints. 

1032 

1033 Args: 

1034 to_remove: List of commit shas 

1035 """ 

1036 for sha in to_remove: 

1037 del self._graftpoints[sha] 

1038 

1039 def _read_heads(self, name: str) -> list[bytes]: 

1040 f = self.get_named_file(name) 

1041 if f is None: 

1042 return [] 

1043 with f: 

1044 return [line.strip() for line in f.readlines() if line.strip()] 

1045 

1046 def get_worktree(self) -> "WorkTree": 

1047 """Get the working tree for this repository. 

1048 

1049 Returns: 

1050 WorkTree instance for performing working tree operations 

1051 

1052 Raises: 

1053 NotImplementedError: If the repository doesn't support working trees 

1054 """ 

1055 raise NotImplementedError( 

1056 "Working tree operations not supported by this repository type" 

1057 ) 

1058 

1059 @replace_me(remove_in="0.26.0") 

1060 def do_commit( 

1061 self, 

1062 message: Optional[bytes] = None, 

1063 committer: Optional[bytes] = None, 

1064 author: Optional[bytes] = None, 

1065 commit_timestamp: Optional[float] = None, 

1066 commit_timezone: Optional[int] = None, 

1067 author_timestamp: Optional[float] = None, 

1068 author_timezone: Optional[int] = None, 

1069 tree: Optional[ObjectID] = None, 

1070 encoding: Optional[bytes] = None, 

1071 ref: Optional[Ref] = b"HEAD", 

1072 merge_heads: Optional[list[ObjectID]] = None, 

1073 no_verify: bool = False, 

1074 sign: bool = False, 

1075 ) -> bytes: 

1076 """Create a new commit. 

1077 

1078 If not specified, committer and author default to 

1079 get_user_identity(..., 'COMMITTER') 

1080 and get_user_identity(..., 'AUTHOR') respectively. 

1081 

1082 Args: 

1083 message: Commit message (bytes or callable that takes (repo, commit) 

1084 and returns bytes) 

1085 committer: Committer fullname 

1086 author: Author fullname 

1087 commit_timestamp: Commit timestamp (defaults to now) 

1088 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1089 author_timestamp: Author timestamp (defaults to commit 

1090 timestamp) 

1091 author_timezone: Author timestamp timezone 

1092 (defaults to commit timestamp timezone) 

1093 tree: SHA1 of the tree root to use (if not specified the 

1094 current index will be committed). 

1095 encoding: Encoding 

1096 ref: Optional ref to commit to (defaults to current branch). 

1097 If None, creates a dangling commit without updating any ref. 

1098 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1099 no_verify: Skip pre-commit and commit-msg hooks 

1100 sign: GPG Sign the commit (bool, defaults to False, 

1101 pass True to use default GPG key, 

1102 pass a str containing Key ID to use a specific GPG key) 

1103 

1104 Returns: 

1105 New commit SHA1 

1106 """ 

1107 return self.get_worktree().commit( 

1108 message=message, 

1109 committer=committer, 

1110 author=author, 

1111 commit_timestamp=commit_timestamp, 

1112 commit_timezone=commit_timezone, 

1113 author_timestamp=author_timestamp, 

1114 author_timezone=author_timezone, 

1115 tree=tree, 

1116 encoding=encoding, 

1117 ref=ref, 

1118 merge_heads=merge_heads, 

1119 no_verify=no_verify, 

1120 sign=sign, 

1121 ) 

1122 

1123 

1124def read_gitfile(f: BinaryIO) -> str: 

1125 """Read a ``.git`` file. 

1126 

1127 The first line of the file should start with "gitdir: " 

1128 

1129 Args: 

1130 f: File-like object to read from 

1131 Returns: A path 

1132 """ 

1133 cs = f.read() 

1134 if not cs.startswith(b"gitdir: "): 

1135 raise ValueError("Expected file to start with 'gitdir: '") 

1136 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8") 

1137 

1138 

1139class UnsupportedVersion(Exception): 

1140 """Unsupported repository version.""" 

1141 

1142 def __init__(self, version) -> None: 

1143 """Initialize UnsupportedVersion exception. 

1144 

1145 Args: 

1146 version: The unsupported repository version 

1147 """ 

1148 self.version = version 

1149 

1150 

1151class UnsupportedExtension(Exception): 

1152 """Unsupported repository extension.""" 

1153 

1154 def __init__(self, extension) -> None: 

1155 """Initialize UnsupportedExtension exception. 

1156 

1157 Args: 

1158 extension: The unsupported repository extension 

1159 """ 

1160 self.extension = extension 

1161 

1162 

1163class Repo(BaseRepo): 

1164 """A git repository backed by local disk. 

1165 

1166 To open an existing repository, call the constructor with 

1167 the path of the repository. 

1168 

1169 To create a new repository, use the Repo.init class method. 

1170 

1171 Note that a repository object may hold on to resources such 

1172 as file handles for performance reasons; call .close() to free 

1173 up those resources. 

1174 

1175 Attributes: 

1176 path: Path to the working copy (if it exists) or repository control 

1177 directory (if the repository is bare) 

1178 bare: Whether this is a bare repository 

1179 """ 

1180 

1181 path: str 

1182 bare: bool 

1183 object_store: DiskObjectStore 

1184 

1185 def __init__( 

1186 self, 

1187 root: Union[str, bytes, os.PathLike], 

1188 object_store: Optional[PackBasedObjectStore] = None, 

1189 bare: Optional[bool] = None, 

1190 ) -> None: 

1191 """Open a repository on disk. 

1192 

1193 Args: 

1194 root: Path to the repository's root. 

1195 object_store: ObjectStore to use; if omitted, we use the 

1196 repository's default object store 

1197 bare: True if this is a bare repository. 

1198 """ 

1199 root = os.fspath(root) 

1200 if isinstance(root, bytes): 

1201 root = os.fsdecode(root) 

1202 hidden_path = os.path.join(root, CONTROLDIR) 

1203 if bare is None: 

1204 if os.path.isfile(hidden_path) or os.path.isdir( 

1205 os.path.join(hidden_path, OBJECTDIR) 

1206 ): 

1207 bare = False 

1208 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1209 os.path.join(root, REFSDIR) 

1210 ): 

1211 bare = True 

1212 else: 

1213 raise NotGitRepository( 

1214 "No git repository was found at {path}".format(**dict(path=root)) 

1215 ) 

1216 

1217 self.bare = bare 

1218 if bare is False: 

1219 if os.path.isfile(hidden_path): 

1220 with open(hidden_path, "rb") as f: 

1221 path = read_gitfile(f) 

1222 self._controldir = os.path.join(root, path) 

1223 else: 

1224 self._controldir = hidden_path 

1225 else: 

1226 self._controldir = root 

1227 commondir = self.get_named_file(COMMONDIR) 

1228 if commondir is not None: 

1229 with commondir: 

1230 self._commondir = os.path.join( 

1231 self.controldir(), 

1232 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1233 ) 

1234 else: 

1235 self._commondir = self._controldir 

1236 self.path = root 

1237 

1238 # Initialize refs early so they're available for config condition matchers 

1239 self.refs = DiskRefsContainer( 

1240 self.commondir(), self._controldir, logger=self._write_reflog 

1241 ) 

1242 

1243 # Initialize worktrees container 

1244 from .worktree import WorkTreeContainer 

1245 

1246 self.worktrees = WorkTreeContainer(self) 

1247 

1248 config = self.get_config() 

1249 try: 

1250 repository_format_version = config.get("core", "repositoryformatversion") 

1251 format_version = ( 

1252 0 

1253 if repository_format_version is None 

1254 else int(repository_format_version) 

1255 ) 

1256 except KeyError: 

1257 format_version = 0 

1258 

1259 if format_version not in (0, 1): 

1260 raise UnsupportedVersion(format_version) 

1261 

1262 # Track extensions we encounter 

1263 has_reftable_extension = False 

1264 for extension, value in config.items((b"extensions",)): 

1265 if extension.lower() == b"refstorage": 

1266 if value == b"reftable": 

1267 has_reftable_extension = True 

1268 else: 

1269 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1270 elif extension.lower() not in (b"worktreeconfig",): 

1271 raise UnsupportedExtension(extension) 

1272 

1273 if object_store is None: 

1274 object_store = DiskObjectStore.from_config( 

1275 os.path.join(self.commondir(), OBJECTDIR), config 

1276 ) 

1277 

1278 # Use reftable if extension is configured 

1279 if has_reftable_extension: 

1280 from .reftable import ReftableRefsContainer 

1281 

1282 self.refs = ReftableRefsContainer(self.commondir()) 

1283 # Update worktrees container after refs change 

1284 self.worktrees = WorkTreeContainer(self) 

1285 BaseRepo.__init__(self, object_store, self.refs) 

1286 

1287 self._graftpoints = {} 

1288 graft_file = self.get_named_file( 

1289 os.path.join("info", "grafts"), basedir=self.commondir() 

1290 ) 

1291 if graft_file: 

1292 with graft_file: 

1293 self._graftpoints.update(parse_graftpoints(graft_file)) 

1294 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1295 if graft_file: 

1296 with graft_file: 

1297 self._graftpoints.update(parse_graftpoints(graft_file)) 

1298 

1299 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1300 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1301 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1302 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1303 

1304 # Initialize filter context as None, will be created lazily 

1305 self.filter_context = None 

1306 

1307 def get_worktree(self) -> "WorkTree": 

1308 """Get the working tree for this repository. 

1309 

1310 Returns: 

1311 WorkTree instance for performing working tree operations 

1312 """ 

1313 from .worktree import WorkTree 

1314 

1315 return WorkTree(self, self.path) 

1316 

1317 def _write_reflog( 

1318 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1319 ) -> None: 

1320 from .reflog import format_reflog_line 

1321 

1322 path = self._reflog_path(ref) 

1323 try: 

1324 os.makedirs(os.path.dirname(path)) 

1325 except FileExistsError: 

1326 pass 

1327 if committer is None: 

1328 config = self.get_config_stack() 

1329 committer = get_user_identity(config) 

1330 check_user_identity(committer) 

1331 if timestamp is None: 

1332 timestamp = int(time.time()) 

1333 if timezone is None: 

1334 timezone = 0 # FIXME 

1335 with open(path, "ab") as f: 

1336 f.write( 

1337 format_reflog_line( 

1338 old_sha, new_sha, committer, timestamp, timezone, message 

1339 ) 

1340 + b"\n" 

1341 ) 

1342 

1343 def _reflog_path(self, ref: bytes) -> str: 

1344 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1345 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1346 

1347 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1348 return os.path.join(base, "logs", os.fsdecode(ref)) 

1349 

1350 def read_reflog(self, ref): 

1351 """Read reflog entries for a reference. 

1352 

1353 Args: 

1354 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1355 

1356 Yields: 

1357 reflog.Entry objects in chronological order (oldest first) 

1358 """ 

1359 from .reflog import read_reflog 

1360 

1361 path = self._reflog_path(ref) 

1362 try: 

1363 with open(path, "rb") as f: 

1364 yield from read_reflog(f) 

1365 except FileNotFoundError: 

1366 return 

1367 

1368 @classmethod 

1369 def discover(cls, start="."): 

1370 """Iterate parent directories to discover a repository. 

1371 

1372 Return a Repo object for the first parent directory that looks like a 

1373 Git repository. 

1374 

1375 Args: 

1376 start: The directory to start discovery from (defaults to '.') 

1377 """ 

1378 remaining = True 

1379 path = os.path.abspath(start) 

1380 while remaining: 

1381 try: 

1382 return cls(path) 

1383 except NotGitRepository: 

1384 path, remaining = os.path.split(path) 

1385 raise NotGitRepository( 

1386 "No git repository was found at {path}".format(**dict(path=start)) 

1387 ) 

1388 

1389 def controldir(self) -> str: 

1390 """Return the path of the control directory.""" 

1391 return self._controldir 

1392 

1393 def commondir(self) -> str: 

1394 """Return the path of the common directory. 

1395 

1396 For a main working tree, it is identical to controldir(). 

1397 

1398 For a linked working tree, it is the control directory of the 

1399 main working tree. 

1400 """ 

1401 return self._commondir 

1402 

1403 def _determine_file_mode(self) -> bool: 

1404 """Probe the file-system to determine whether permissions can be trusted. 

1405 

1406 Returns: True if permissions can be trusted, False otherwise. 

1407 """ 

1408 fname = os.path.join(self.path, ".probe-permissions") 

1409 with open(fname, "w") as f: 

1410 f.write("") 

1411 

1412 st1 = os.lstat(fname) 

1413 try: 

1414 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1415 except PermissionError: 

1416 return False 

1417 st2 = os.lstat(fname) 

1418 

1419 os.unlink(fname) 

1420 

1421 mode_differs = st1.st_mode != st2.st_mode 

1422 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1423 

1424 return mode_differs and st2_has_exec 

1425 

1426 def _determine_symlinks(self) -> bool: 

1427 """Probe the filesystem to determine whether symlinks can be created. 

1428 

1429 Returns: True if symlinks can be created, False otherwise. 

1430 """ 

1431 # TODO(jelmer): Actually probe disk / look at filesystem 

1432 return sys.platform != "win32" 

1433 

1434 def _put_named_file(self, path: str, contents: bytes) -> None: 

1435 """Write a file to the control dir with the given name and contents. 

1436 

1437 Args: 

1438 path: The path to the file, relative to the control dir. 

1439 contents: A string to write to the file. 

1440 """ 

1441 path = path.lstrip(os.path.sep) 

1442 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1443 f.write(contents) 

1444 

1445 def _del_named_file(self, path: str) -> None: 

1446 try: 

1447 os.unlink(os.path.join(self.controldir(), path)) 

1448 except FileNotFoundError: 

1449 return 

1450 

1451 def get_named_file(self, path, basedir=None): 

1452 """Get a file from the control dir with a specific name. 

1453 

1454 Although the filename should be interpreted as a filename relative to 

1455 the control dir in a disk-based Repo, the object returned need not be 

1456 pointing to a file in that location. 

1457 

1458 Args: 

1459 path: The path to the file, relative to the control dir. 

1460 basedir: Optional argument that specifies an alternative to the 

1461 control dir. 

1462 Returns: An open file object, or None if the file does not exist. 

1463 """ 

1464 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1465 # the dumb web serving code. 

1466 if basedir is None: 

1467 basedir = self.controldir() 

1468 path = path.lstrip(os.path.sep) 

1469 try: 

1470 return open(os.path.join(basedir, path), "rb") 

1471 except FileNotFoundError: 

1472 return None 

1473 

1474 def index_path(self): 

1475 """Return path to the index file.""" 

1476 return os.path.join(self.controldir(), INDEX_FILENAME) 

1477 

1478 def open_index(self) -> "Index": 

1479 """Open the index for this repository. 

1480 

1481 Raises: 

1482 NoIndexPresent: If no index is present 

1483 Returns: The matching `Index` 

1484 """ 

1485 from .index import Index 

1486 

1487 if not self.has_index(): 

1488 raise NoIndexPresent 

1489 

1490 # Check for manyFiles feature configuration 

1491 config = self.get_config_stack() 

1492 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1493 skip_hash = False 

1494 index_version = None 

1495 

1496 if many_files: 

1497 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1498 try: 

1499 index_version_str = config.get(b"index", b"version") 

1500 index_version = int(index_version_str) 

1501 except KeyError: 

1502 index_version = 4 # Default to version 4 for manyFiles 

1503 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1504 else: 

1505 # Check for explicit index settings 

1506 try: 

1507 index_version_str = config.get(b"index", b"version") 

1508 index_version = int(index_version_str) 

1509 except KeyError: 

1510 index_version = None 

1511 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1512 

1513 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1514 

1515 def has_index(self) -> bool: 

1516 """Check if an index is present.""" 

1517 # Bare repos must never have index files; non-bare repos may have a 

1518 # missing index file, which is treated as empty. 

1519 return not self.bare 

1520 

1521 @replace_me(remove_in="0.26.0") 

1522 def stage( 

1523 self, 

1524 fs_paths: Union[ 

1525 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1526 ], 

1527 ) -> None: 

1528 """Stage a set of paths. 

1529 

1530 Args: 

1531 fs_paths: List of paths, relative to the repository path 

1532 """ 

1533 return self.get_worktree().stage(fs_paths) 

1534 

1535 @replace_me(remove_in="0.26.0") 

1536 def unstage(self, fs_paths: list[str]) -> None: 

1537 """Unstage specific file in the index. 

1538 

1539 Args: 

1540 fs_paths: a list of files to unstage, 

1541 relative to the repository path. 

1542 """ 

1543 return self.get_worktree().unstage(fs_paths) 

1544 

1545 def clone( 

1546 self, 

1547 target_path, 

1548 *, 

1549 mkdir=True, 

1550 bare=False, 

1551 origin=b"origin", 

1552 checkout=None, 

1553 branch=None, 

1554 progress=None, 

1555 depth: Optional[int] = None, 

1556 symlinks=None, 

1557 ) -> "Repo": 

1558 """Clone this repository. 

1559 

1560 Args: 

1561 target_path: Target path 

1562 mkdir: Create the target directory 

1563 bare: Whether to create a bare repository 

1564 checkout: Whether or not to check-out HEAD after cloning 

1565 origin: Base name for refs in target repository 

1566 cloned from this repository 

1567 branch: Optional branch or tag to be used as HEAD in the new repository 

1568 instead of this repository's HEAD. 

1569 progress: Optional progress function 

1570 depth: Depth at which to fetch 

1571 symlinks: Symlinks setting (default to autodetect) 

1572 Returns: Created repository as `Repo` 

1573 """ 

1574 encoded_path = os.fsencode(self.path) 

1575 

1576 if mkdir: 

1577 os.mkdir(target_path) 

1578 

1579 try: 

1580 if not bare: 

1581 target = Repo.init(target_path, symlinks=symlinks) 

1582 if checkout is None: 

1583 checkout = True 

1584 else: 

1585 if checkout: 

1586 raise ValueError("checkout and bare are incompatible") 

1587 target = Repo.init_bare(target_path) 

1588 

1589 try: 

1590 target_config = target.get_config() 

1591 target_config.set((b"remote", origin), b"url", encoded_path) 

1592 target_config.set( 

1593 (b"remote", origin), 

1594 b"fetch", 

1595 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1596 ) 

1597 target_config.write_to_path() 

1598 

1599 ref_message = b"clone: from " + encoded_path 

1600 self.fetch(target, depth=depth) 

1601 target.refs.import_refs( 

1602 b"refs/remotes/" + origin, 

1603 self.refs.as_dict(b"refs/heads"), 

1604 message=ref_message, 

1605 ) 

1606 target.refs.import_refs( 

1607 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1608 ) 

1609 

1610 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1611 origin_head = head_chain[-1] if head_chain else None 

1612 if origin_sha and not origin_head: 

1613 # set detached HEAD 

1614 target.refs[b"HEAD"] = origin_sha 

1615 else: 

1616 _set_origin_head(target.refs, origin, origin_head) 

1617 head_ref = _set_default_branch( 

1618 target.refs, origin, origin_head, branch, ref_message 

1619 ) 

1620 

1621 # Update target head 

1622 if head_ref: 

1623 head = _set_head(target.refs, head_ref, ref_message) 

1624 else: 

1625 head = None 

1626 

1627 if checkout and head is not None: 

1628 target.get_worktree().reset_index() 

1629 except BaseException: 

1630 target.close() 

1631 raise 

1632 except BaseException: 

1633 if mkdir: 

1634 import shutil 

1635 

1636 shutil.rmtree(target_path) 

1637 raise 

1638 return target 

1639 

1640 @replace_me(remove_in="0.26.0") 

1641 def reset_index(self, tree: Optional[bytes] = None): 

1642 """Reset the index back to a specific tree. 

1643 

1644 Args: 

1645 tree: Tree SHA to reset to, None for current HEAD tree. 

1646 """ 

1647 return self.get_worktree().reset_index(tree) 

1648 

1649 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1650 """Get condition matchers for includeIf conditions. 

1651 

1652 Returns a dict of condition prefix to matcher function. 

1653 """ 

1654 from pathlib import Path 

1655 

1656 from .config import ConditionMatcher, match_glob_pattern 

1657 

1658 # Add gitdir matchers 

1659 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1660 """Match gitdir against a pattern. 

1661 

1662 Args: 

1663 pattern: Pattern to match against 

1664 case_sensitive: Whether to match case-sensitively 

1665 

1666 Returns: 

1667 True if gitdir matches pattern 

1668 """ 

1669 # Handle relative patterns (starting with ./) 

1670 if pattern.startswith("./"): 

1671 # Can't handle relative patterns without config directory context 

1672 return False 

1673 

1674 # Normalize repository path 

1675 try: 

1676 repo_path = str(Path(self._controldir).resolve()) 

1677 except (OSError, ValueError): 

1678 return False 

1679 

1680 # Expand ~ in pattern and normalize 

1681 pattern = os.path.expanduser(pattern) 

1682 

1683 # Normalize pattern following Git's rules 

1684 pattern = pattern.replace("\\", "/") 

1685 if not pattern.startswith(("~/", "./", "/", "**")): 

1686 # Check for Windows absolute path 

1687 if len(pattern) >= 2 and pattern[1] == ":": 

1688 pass 

1689 else: 

1690 pattern = "**/" + pattern 

1691 if pattern.endswith("/"): 

1692 pattern = pattern + "**" 

1693 

1694 # Use the existing _match_gitdir_pattern function 

1695 from .config import _match_gitdir_pattern 

1696 

1697 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1698 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1699 

1700 return _match_gitdir_pattern( 

1701 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1702 ) 

1703 

1704 # Add onbranch matcher 

1705 def match_onbranch(pattern: str) -> bool: 

1706 """Match current branch against a pattern. 

1707 

1708 Args: 

1709 pattern: Pattern to match against 

1710 

1711 Returns: 

1712 True if current branch matches pattern 

1713 """ 

1714 try: 

1715 # Get the current branch using refs 

1716 ref_chain, _ = self.refs.follow(b"HEAD") 

1717 head_ref = ref_chain[-1] # Get the final resolved ref 

1718 except KeyError: 

1719 pass 

1720 else: 

1721 if head_ref and head_ref.startswith(b"refs/heads/"): 

1722 # Extract branch name from ref 

1723 branch = head_ref[11:].decode("utf-8", errors="replace") 

1724 return match_glob_pattern(branch, pattern) 

1725 return False 

1726 

1727 matchers: dict[str, ConditionMatcher] = { 

1728 "onbranch:": match_onbranch, 

1729 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1730 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1731 } 

1732 

1733 return matchers 

1734 

1735 def get_worktree_config(self) -> "ConfigFile": 

1736 """Get the worktree-specific config. 

1737 

1738 Returns: 

1739 ConfigFile object for the worktree config 

1740 """ 

1741 from .config import ConfigFile 

1742 

1743 path = os.path.join(self.commondir(), "config.worktree") 

1744 try: 

1745 # Pass condition matchers for includeIf evaluation 

1746 condition_matchers = self._get_config_condition_matchers() 

1747 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1748 except FileNotFoundError: 

1749 cf = ConfigFile() 

1750 cf.path = path 

1751 return cf 

1752 

1753 def get_config(self) -> "ConfigFile": 

1754 """Retrieve the config object. 

1755 

1756 Returns: `ConfigFile` object for the ``.git/config`` file. 

1757 """ 

1758 from .config import ConfigFile 

1759 

1760 path = os.path.join(self._commondir, "config") 

1761 try: 

1762 # Pass condition matchers for includeIf evaluation 

1763 condition_matchers = self._get_config_condition_matchers() 

1764 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1765 except FileNotFoundError: 

1766 ret = ConfigFile() 

1767 ret.path = path 

1768 return ret 

1769 

1770 def get_rebase_state_manager(self): 

1771 """Get the appropriate rebase state manager for this repository. 

1772 

1773 Returns: DiskRebaseStateManager instance 

1774 """ 

1775 import os 

1776 

1777 from .rebase import DiskRebaseStateManager 

1778 

1779 path = os.path.join(self.controldir(), "rebase-merge") 

1780 return DiskRebaseStateManager(path) 

1781 

1782 def get_description(self): 

1783 """Retrieve the description of this repository. 

1784 

1785 Returns: A string describing the repository or None. 

1786 """ 

1787 path = os.path.join(self._controldir, "description") 

1788 try: 

1789 with GitFile(path, "rb") as f: 

1790 return f.read() 

1791 except FileNotFoundError: 

1792 return None 

1793 

1794 def __repr__(self) -> str: 

1795 """Return string representation of this repository.""" 

1796 return f"<Repo at {self.path!r}>" 

1797 

1798 def set_description(self, description) -> None: 

1799 """Set the description for this repository. 

1800 

1801 Args: 

1802 description: Text to set as description for this repository. 

1803 """ 

1804 self._put_named_file("description", description) 

1805 

1806 @classmethod 

1807 def _init_maybe_bare( 

1808 cls, 

1809 path: Union[str, bytes, os.PathLike], 

1810 controldir: Union[str, bytes, os.PathLike], 

1811 bare, 

1812 object_store=None, 

1813 config=None, 

1814 default_branch=None, 

1815 symlinks: Optional[bool] = None, 

1816 format: Optional[int] = None, 

1817 ): 

1818 path = os.fspath(path) 

1819 if isinstance(path, bytes): 

1820 path = os.fsdecode(path) 

1821 controldir = os.fspath(controldir) 

1822 if isinstance(controldir, bytes): 

1823 controldir = os.fsdecode(controldir) 

1824 for d in BASE_DIRECTORIES: 

1825 os.mkdir(os.path.join(controldir, *d)) 

1826 if object_store is None: 

1827 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1828 ret = cls(path, bare=bare, object_store=object_store) 

1829 if default_branch is None: 

1830 if config is None: 

1831 from .config import StackedConfig 

1832 

1833 config = StackedConfig.default() 

1834 try: 

1835 default_branch = config.get("init", "defaultBranch") 

1836 except KeyError: 

1837 default_branch = DEFAULT_BRANCH 

1838 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1839 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1840 return ret 

1841 

1842 @classmethod 

1843 def init( 

1844 cls, 

1845 path: Union[str, bytes, os.PathLike], 

1846 *, 

1847 mkdir: bool = False, 

1848 config=None, 

1849 default_branch=None, 

1850 symlinks: Optional[bool] = None, 

1851 format: Optional[int] = None, 

1852 ) -> "Repo": 

1853 """Create a new repository. 

1854 

1855 Args: 

1856 path: Path in which to create the repository 

1857 mkdir: Whether to create the directory 

1858 config: Configuration object 

1859 default_branch: Default branch name 

1860 symlinks: Whether to support symlinks 

1861 format: Repository format version (defaults to 0) 

1862 Returns: `Repo` instance 

1863 """ 

1864 path = os.fspath(path) 

1865 if isinstance(path, bytes): 

1866 path = os.fsdecode(path) 

1867 if mkdir: 

1868 os.mkdir(path) 

1869 controldir = os.path.join(path, CONTROLDIR) 

1870 os.mkdir(controldir) 

1871 _set_filesystem_hidden(controldir) 

1872 return cls._init_maybe_bare( 

1873 path, 

1874 controldir, 

1875 False, 

1876 config=config, 

1877 default_branch=default_branch, 

1878 symlinks=symlinks, 

1879 format=format, 

1880 ) 

1881 

1882 @classmethod 

1883 def _init_new_working_directory( 

1884 cls, 

1885 path: Union[str, bytes, os.PathLike], 

1886 main_repo, 

1887 identifier=None, 

1888 mkdir=False, 

1889 ): 

1890 """Create a new working directory linked to a repository. 

1891 

1892 Args: 

1893 path: Path in which to create the working tree. 

1894 main_repo: Main repository to reference 

1895 identifier: Worktree identifier 

1896 mkdir: Whether to create the directory 

1897 Returns: `Repo` instance 

1898 """ 

1899 path = os.fspath(path) 

1900 if isinstance(path, bytes): 

1901 path = os.fsdecode(path) 

1902 if mkdir: 

1903 os.mkdir(path) 

1904 if identifier is None: 

1905 identifier = os.path.basename(path) 

1906 # Ensure we use absolute path for the worktree control directory 

1907 main_controldir = os.path.abspath(main_repo.controldir()) 

1908 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1909 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1910 gitdirfile = os.path.join(path, CONTROLDIR) 

1911 with open(gitdirfile, "wb") as f: 

1912 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1913 try: 

1914 os.mkdir(main_worktreesdir) 

1915 except FileExistsError: 

1916 pass 

1917 try: 

1918 os.mkdir(worktree_controldir) 

1919 except FileExistsError: 

1920 pass 

1921 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1922 f.write(os.fsencode(gitdirfile) + b"\n") 

1923 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1924 f.write(b"../..\n") 

1925 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1926 f.write(main_repo.head() + b"\n") 

1927 r = cls(os.path.normpath(path)) 

1928 r.get_worktree().reset_index() 

1929 return r 

1930 

1931 @classmethod 

1932 def init_bare( 

1933 cls, 

1934 path: Union[str, bytes, os.PathLike], 

1935 *, 

1936 mkdir=False, 

1937 object_store=None, 

1938 config=None, 

1939 default_branch=None, 

1940 format: Optional[int] = None, 

1941 ): 

1942 """Create a new bare repository. 

1943 

1944 ``path`` should already exist and be an empty directory. 

1945 

1946 Args: 

1947 path: Path to create bare repository in 

1948 mkdir: Whether to create the directory 

1949 object_store: Object store to use 

1950 config: Configuration object 

1951 default_branch: Default branch name 

1952 format: Repository format version (defaults to 0) 

1953 Returns: a `Repo` instance 

1954 """ 

1955 path = os.fspath(path) 

1956 if isinstance(path, bytes): 

1957 path = os.fsdecode(path) 

1958 if mkdir: 

1959 os.mkdir(path) 

1960 return cls._init_maybe_bare( 

1961 path, 

1962 path, 

1963 True, 

1964 object_store=object_store, 

1965 config=config, 

1966 default_branch=default_branch, 

1967 format=format, 

1968 ) 

1969 

1970 create = init_bare 

1971 

1972 def close(self) -> None: 

1973 """Close any files opened by this repository.""" 

1974 self.object_store.close() 

1975 # Clean up filter context if it was created 

1976 if self.filter_context is not None: 

1977 self.filter_context.close() 

1978 self.filter_context = None 

1979 

1980 def __enter__(self): 

1981 """Enter context manager.""" 

1982 return self 

1983 

1984 def __exit__(self, exc_type, exc_val, exc_tb): 

1985 """Exit context manager and close repository.""" 

1986 self.close() 

1987 

1988 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

1989 """Read .gitattributes file from working tree. 

1990 

1991 Returns: 

1992 Dictionary mapping file patterns to attributes 

1993 """ 

1994 gitattributes = {} 

1995 gitattributes_path = os.path.join(self.path, ".gitattributes") 

1996 

1997 if os.path.exists(gitattributes_path): 

1998 with open(gitattributes_path, "rb") as f: 

1999 for line in f: 

2000 line = line.strip() 

2001 if not line or line.startswith(b"#"): 

2002 continue 

2003 

2004 parts = line.split() 

2005 if len(parts) < 2: 

2006 continue 

2007 

2008 pattern = parts[0] 

2009 attrs = {} 

2010 

2011 for attr in parts[1:]: 

2012 if attr.startswith(b"-"): 

2013 # Unset attribute 

2014 attrs[attr[1:]] = b"false" 

2015 elif b"=" in attr: 

2016 # Set to value 

2017 key, value = attr.split(b"=", 1) 

2018 attrs[key] = value 

2019 else: 

2020 # Set attribute 

2021 attrs[attr] = b"true" 

2022 

2023 gitattributes[pattern] = attrs 

2024 

2025 return gitattributes 

2026 

2027 def get_blob_normalizer(self): 

2028 """Return a BlobNormalizer object.""" 

2029 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2030 

2031 # Get fresh configuration and GitAttributes 

2032 config_stack = self.get_config_stack() 

2033 git_attributes = self.get_gitattributes() 

2034 

2035 # Lazily create FilterContext if needed 

2036 if self.filter_context is None: 

2037 filter_registry = FilterRegistry(config_stack, self) 

2038 self.filter_context = FilterContext(filter_registry) 

2039 else: 

2040 # Refresh the context with current config to handle config changes 

2041 self.filter_context.refresh_config(config_stack) 

2042 

2043 # Return a new FilterBlobNormalizer with the context 

2044 return FilterBlobNormalizer( 

2045 config_stack, git_attributes, filter_context=self.filter_context 

2046 ) 

2047 

2048 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2049 """Read gitattributes for the repository. 

2050 

2051 Args: 

2052 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2053 

2054 Returns: 

2055 GitAttributes object that can be used to match paths 

2056 """ 

2057 from .attrs import ( 

2058 GitAttributes, 

2059 Pattern, 

2060 parse_git_attributes, 

2061 ) 

2062 

2063 patterns = [] 

2064 

2065 # Read system gitattributes (TODO: implement this) 

2066 # Read global gitattributes (TODO: implement this) 

2067 

2068 # Read repository .gitattributes from index/tree 

2069 if tree is None: 

2070 try: 

2071 # Try to get from HEAD 

2072 head = self[b"HEAD"] 

2073 if isinstance(head, Tag): 

2074 _cls, obj = head.object 

2075 head = self.get_object(obj) 

2076 assert isinstance(head, Commit) 

2077 tree = head.tree 

2078 except KeyError: 

2079 # No HEAD, no attributes from tree 

2080 pass 

2081 

2082 if tree is not None: 

2083 try: 

2084 tree_obj = self[tree] 

2085 assert isinstance(tree_obj, Tree) 

2086 if b".gitattributes" in tree_obj: 

2087 _, attrs_sha = tree_obj[b".gitattributes"] 

2088 attrs_blob = self[attrs_sha] 

2089 if isinstance(attrs_blob, Blob): 

2090 attrs_data = BytesIO(attrs_blob.data) 

2091 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2092 pattern = Pattern(pattern_bytes) 

2093 patterns.append((pattern, attrs)) 

2094 except (KeyError, NotTreeError): 

2095 pass 

2096 

2097 # Read .git/info/attributes 

2098 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2099 if os.path.exists(info_attrs_path): 

2100 with open(info_attrs_path, "rb") as f: 

2101 for pattern_bytes, attrs in parse_git_attributes(f): 

2102 pattern = Pattern(pattern_bytes) 

2103 patterns.append((pattern, attrs)) 

2104 

2105 # Read .gitattributes from working directory (if it exists) 

2106 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2107 if os.path.exists(working_attrs_path): 

2108 with open(working_attrs_path, "rb") as f: 

2109 for pattern_bytes, attrs in parse_git_attributes(f): 

2110 pattern = Pattern(pattern_bytes) 

2111 patterns.append((pattern, attrs)) 

2112 

2113 return GitAttributes(patterns) 

2114 

2115 @replace_me(remove_in="0.26.0") 

2116 def _sparse_checkout_file_path(self) -> str: 

2117 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2118 return self.get_worktree()._sparse_checkout_file_path() 

2119 

2120 @replace_me(remove_in="0.26.0") 

2121 def configure_for_cone_mode(self) -> None: 

2122 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2123 return self.get_worktree().configure_for_cone_mode() 

2124 

2125 @replace_me(remove_in="0.26.0") 

2126 def infer_cone_mode(self) -> bool: 

2127 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2128 return self.get_worktree().infer_cone_mode() 

2129 

2130 @replace_me(remove_in="0.26.0") 

2131 def get_sparse_checkout_patterns(self) -> list[str]: 

2132 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2133 

2134 Returns: 

2135 A list of patterns. Returns an empty list if the file is missing. 

2136 """ 

2137 return self.get_worktree().get_sparse_checkout_patterns() 

2138 

2139 @replace_me(remove_in="0.26.0") 

2140 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2141 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2142 

2143 Creates the info/ directory if it does not exist. 

2144 

2145 Args: 

2146 patterns: A list of gitignore-style patterns to store. 

2147 """ 

2148 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2149 

2150 @replace_me(remove_in="0.26.0") 

2151 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2152 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2153 

2154 For each directory to include, add an inclusion line that "undoes" the prior 

2155 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2156 Never add the same line twice. 

2157 """ 

2158 return self.get_worktree().set_cone_mode_patterns(dirs) 

2159 

2160 

2161class MemoryRepo(BaseRepo): 

2162 """Repo that stores refs, objects, and named files in memory. 

2163 

2164 MemoryRepos are always bare: they have no working tree and no index, since 

2165 those have a stronger dependency on the filesystem. 

2166 """ 

2167 

2168 def __init__(self) -> None: 

2169 """Create a new repository in memory.""" 

2170 from .config import ConfigFile 

2171 

2172 self._reflog: list[Any] = [] 

2173 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2174 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore[arg-type] 

2175 self._named_files: dict[str, bytes] = {} 

2176 self.bare = True 

2177 self._config = ConfigFile() 

2178 self._description = None 

2179 self.filter_context = None 

2180 

2181 def _append_reflog(self, *args) -> None: 

2182 self._reflog.append(args) 

2183 

2184 def set_description(self, description) -> None: 

2185 """Set the description for this repository. 

2186 

2187 Args: 

2188 description: Text to set as description 

2189 """ 

2190 self._description = description 

2191 

2192 def get_description(self): 

2193 """Get the description of this repository. 

2194 

2195 Returns: 

2196 Repository description as bytes 

2197 """ 

2198 return self._description 

2199 

2200 def _determine_file_mode(self): 

2201 """Probe the file-system to determine whether permissions can be trusted. 

2202 

2203 Returns: True if permissions can be trusted, False otherwise. 

2204 """ 

2205 return sys.platform != "win32" 

2206 

2207 def _determine_symlinks(self): 

2208 """Probe the file-system to determine whether permissions can be trusted. 

2209 

2210 Returns: True if permissions can be trusted, False otherwise. 

2211 """ 

2212 return sys.platform != "win32" 

2213 

2214 def _put_named_file(self, path, contents) -> None: 

2215 """Write a file to the control dir with the given name and contents. 

2216 

2217 Args: 

2218 path: The path to the file, relative to the control dir. 

2219 contents: A string to write to the file. 

2220 """ 

2221 self._named_files[path] = contents 

2222 

2223 def _del_named_file(self, path) -> None: 

2224 try: 

2225 del self._named_files[path] 

2226 except KeyError: 

2227 pass 

2228 

2229 def get_named_file(self, path, basedir=None): 

2230 """Get a file from the control dir with a specific name. 

2231 

2232 Although the filename should be interpreted as a filename relative to 

2233 the control dir in a disk-baked Repo, the object returned need not be 

2234 pointing to a file in that location. 

2235 

2236 Args: 

2237 path: The path to the file, relative to the control dir. 

2238 basedir: Optional base directory for the path 

2239 Returns: An open file object, or None if the file does not exist. 

2240 """ 

2241 contents = self._named_files.get(path, None) 

2242 if contents is None: 

2243 return None 

2244 return BytesIO(contents) 

2245 

2246 def open_index(self) -> "Index": 

2247 """Fail to open index for this repo, since it is bare. 

2248 

2249 Raises: 

2250 NoIndexPresent: Raised when no index is present 

2251 """ 

2252 raise NoIndexPresent 

2253 

2254 def get_config(self): 

2255 """Retrieve the config object. 

2256 

2257 Returns: `ConfigFile` object. 

2258 """ 

2259 return self._config 

2260 

2261 def get_rebase_state_manager(self): 

2262 """Get the appropriate rebase state manager for this repository. 

2263 

2264 Returns: MemoryRebaseStateManager instance 

2265 """ 

2266 from .rebase import MemoryRebaseStateManager 

2267 

2268 return MemoryRebaseStateManager(self) 

2269 

2270 def get_blob_normalizer(self): 

2271 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2272 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2273 

2274 # Get fresh configuration and GitAttributes 

2275 config_stack = self.get_config_stack() 

2276 git_attributes = self.get_gitattributes() 

2277 

2278 # Lazily create FilterContext if needed 

2279 if self.filter_context is None: 

2280 filter_registry = FilterRegistry(config_stack, self) 

2281 self.filter_context = FilterContext(filter_registry) 

2282 else: 

2283 # Refresh the context with current config to handle config changes 

2284 self.filter_context.refresh_config(config_stack) 

2285 

2286 # Return a new FilterBlobNormalizer with the context 

2287 return FilterBlobNormalizer( 

2288 config_stack, git_attributes, filter_context=self.filter_context 

2289 ) 

2290 

2291 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2292 """Read gitattributes for the repository.""" 

2293 from .attrs import GitAttributes 

2294 

2295 # Memory repos don't have working trees or gitattributes files 

2296 # Return empty GitAttributes 

2297 return GitAttributes([]) 

2298 

2299 def close(self) -> None: 

2300 """Close any resources opened by this repository.""" 

2301 # Clean up filter context if it was created 

2302 if self.filter_context is not None: 

2303 self.filter_context.close() 

2304 self.filter_context = None 

2305 

2306 def do_commit( 

2307 self, 

2308 message: Optional[bytes] = None, 

2309 committer: Optional[bytes] = None, 

2310 author: Optional[bytes] = None, 

2311 commit_timestamp=None, 

2312 commit_timezone=None, 

2313 author_timestamp=None, 

2314 author_timezone=None, 

2315 tree: Optional[ObjectID] = None, 

2316 encoding: Optional[bytes] = None, 

2317 ref: Optional[Ref] = b"HEAD", 

2318 merge_heads: Optional[list[ObjectID]] = None, 

2319 no_verify: bool = False, 

2320 sign: bool = False, 

2321 ): 

2322 """Create a new commit. 

2323 

2324 This is a simplified implementation for in-memory repositories that 

2325 doesn't support worktree operations or hooks. 

2326 

2327 Args: 

2328 message: Commit message 

2329 committer: Committer fullname 

2330 author: Author fullname 

2331 commit_timestamp: Commit timestamp (defaults to now) 

2332 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2333 author_timestamp: Author timestamp (defaults to commit timestamp) 

2334 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2335 tree: SHA1 of the tree root to use 

2336 encoding: Encoding 

2337 ref: Optional ref to commit to (defaults to current branch). 

2338 If None, creates a dangling commit without updating any ref. 

2339 merge_heads: Merge heads 

2340 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2341 sign: GPG Sign the commit (ignored for MemoryRepo) 

2342 

2343 Returns: 

2344 New commit SHA1 

2345 """ 

2346 import time 

2347 

2348 from .objects import Commit 

2349 

2350 if tree is None: 

2351 raise ValueError("tree must be specified for MemoryRepo") 

2352 

2353 c = Commit() 

2354 if len(tree) != 40: 

2355 raise ValueError("tree must be a 40-byte hex sha string") 

2356 c.tree = tree 

2357 

2358 config = self.get_config_stack() 

2359 if merge_heads is None: 

2360 merge_heads = [] 

2361 if committer is None: 

2362 committer = get_user_identity(config, kind="COMMITTER") 

2363 check_user_identity(committer) 

2364 c.committer = committer 

2365 if commit_timestamp is None: 

2366 commit_timestamp = time.time() 

2367 c.commit_time = int(commit_timestamp) 

2368 if commit_timezone is None: 

2369 commit_timezone = 0 

2370 c.commit_timezone = commit_timezone 

2371 if author is None: 

2372 author = get_user_identity(config, kind="AUTHOR") 

2373 c.author = author 

2374 check_user_identity(author) 

2375 if author_timestamp is None: 

2376 author_timestamp = commit_timestamp 

2377 c.author_time = int(author_timestamp) 

2378 if author_timezone is None: 

2379 author_timezone = commit_timezone 

2380 c.author_timezone = author_timezone 

2381 if encoding is None: 

2382 try: 

2383 encoding = config.get(("i18n",), "commitEncoding") 

2384 except KeyError: 

2385 pass 

2386 if encoding is not None: 

2387 c.encoding = encoding 

2388 

2389 # Handle message (for MemoryRepo, we don't support callable messages) 

2390 if callable(message): 

2391 message = message(self, c) 

2392 if message is None: 

2393 raise ValueError("Message callback returned None") 

2394 

2395 if message is None: 

2396 raise ValueError("No commit message specified") 

2397 

2398 c.message = message 

2399 

2400 if ref is None: 

2401 # Create a dangling commit 

2402 c.parents = merge_heads 

2403 self.object_store.add_object(c) 

2404 else: 

2405 try: 

2406 old_head = self.refs[ref] 

2407 c.parents = [old_head, *merge_heads] 

2408 self.object_store.add_object(c) 

2409 ok = self.refs.set_if_equals( 

2410 ref, 

2411 old_head, 

2412 c.id, 

2413 message=b"commit: " + message, 

2414 committer=committer, 

2415 timestamp=commit_timestamp, 

2416 timezone=commit_timezone, 

2417 ) 

2418 except KeyError: 

2419 c.parents = merge_heads 

2420 self.object_store.add_object(c) 

2421 ok = self.refs.add_if_new( 

2422 ref, 

2423 c.id, 

2424 message=b"commit: " + message, 

2425 committer=committer, 

2426 timestamp=commit_timestamp, 

2427 timezone=commit_timezone, 

2428 ) 

2429 if not ok: 

2430 from .errors import CommitError 

2431 

2432 raise CommitError(f"{ref!r} changed during commit") 

2433 

2434 return c.id 

2435 

2436 @classmethod 

2437 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2438 """Create a new bare repository in memory. 

2439 

2440 Args: 

2441 objects: Objects for the new repository, 

2442 as iterable 

2443 refs: Refs as dictionary, mapping names 

2444 to object SHA1s 

2445 format: Repository format version (defaults to 0) 

2446 """ 

2447 ret = cls() 

2448 for obj in objects: 

2449 ret.object_store.add_object(obj) 

2450 for refname, sha in refs.items(): 

2451 ret.refs.add_if_new(refname, sha) 

2452 ret._init_files(bare=True, format=format) 

2453 return ret