Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

975 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable, Iterator 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 TypeVar, 

46 Union, 

47) 

48 

49if TYPE_CHECKING: 

50 # There are no circular imports here, but we try to defer imports as long 

51 # as possible to reduce start-up time for anything that doesn't need 

52 # these imports. 

53 from .attrs import GitAttributes 

54 from .config import ConditionMatcher, ConfigFile, StackedConfig 

55 from .index import Index 

56 from .line_ending import BlobNormalizer 

57 from .notes import Notes 

58 from .object_store import BaseObjectStore, GraphWalker, UnpackedObject 

59 from .rebase import RebaseStateManager 

60 from .walk import Walker 

61 from .worktree import WorkTree 

62 

63from . import replace_me 

64from .errors import ( 

65 NoIndexPresent, 

66 NotBlobError, 

67 NotCommitError, 

68 NotGitRepository, 

69 NotTagError, 

70 NotTreeError, 

71 RefFormatError, 

72) 

73from .file import GitFile 

74from .hooks import ( 

75 CommitMsgShellHook, 

76 Hook, 

77 PostCommitShellHook, 

78 PostReceiveShellHook, 

79 PreCommitShellHook, 

80) 

81from .object_store import ( 

82 DiskObjectStore, 

83 MemoryObjectStore, 

84 MissingObjectFinder, 

85 ObjectStoreGraphWalker, 

86 PackBasedObjectStore, 

87 find_shallow, 

88 peel_sha, 

89) 

90from .objects import ( 

91 Blob, 

92 Commit, 

93 ObjectID, 

94 ShaFile, 

95 Tag, 

96 Tree, 

97 check_hexsha, 

98 valid_hexsha, 

99) 

100from .pack import generate_unpacked_objects 

101from .refs import ( 

102 ANNOTATED_TAG_SUFFIX, # noqa: F401 

103 LOCAL_BRANCH_PREFIX, 

104 LOCAL_TAG_PREFIX, # noqa: F401 

105 SYMREF, # noqa: F401 

106 DictRefsContainer, 

107 DiskRefsContainer, 

108 InfoRefsContainer, # noqa: F401 

109 Ref, 

110 RefsContainer, 

111 _set_default_branch, 

112 _set_head, 

113 _set_origin_head, 

114 check_ref_format, # noqa: F401 

115 is_per_worktree_ref, 

116 read_packed_refs, # noqa: F401 

117 read_packed_refs_with_peeled, # noqa: F401 

118 serialize_refs, 

119 write_packed_refs, # noqa: F401 

120) 

121 

122CONTROLDIR = ".git" 

123OBJECTDIR = "objects" 

124 

125T = TypeVar("T", bound="ShaFile") 

126REFSDIR = "refs" 

127REFSDIR_TAGS = "tags" 

128REFSDIR_HEADS = "heads" 

129INDEX_FILENAME = "index" 

130COMMONDIR = "commondir" 

131GITDIR = "gitdir" 

132WORKTREES = "worktrees" 

133 

134BASE_DIRECTORIES = [ 

135 ["branches"], 

136 [REFSDIR], 

137 [REFSDIR, REFSDIR_TAGS], 

138 [REFSDIR, REFSDIR_HEADS], 

139 ["hooks"], 

140 ["info"], 

141] 

142 

143DEFAULT_BRANCH = b"master" 

144 

145 

146class InvalidUserIdentity(Exception): 

147 """User identity is not of the format 'user <email>'.""" 

148 

149 def __init__(self, identity: str) -> None: 

150 """Initialize InvalidUserIdentity exception.""" 

151 self.identity = identity 

152 

153 

154class DefaultIdentityNotFound(Exception): 

155 """Default identity could not be determined.""" 

156 

157 

158# TODO(jelmer): Cache? 

159def _get_default_identity() -> tuple[str, str]: 

160 import socket 

161 

162 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

163 username = os.environ.get(name) 

164 if username: 

165 break 

166 else: 

167 username = None 

168 

169 try: 

170 import pwd 

171 except ImportError: 

172 fullname = None 

173 else: 

174 try: 

175 entry = pwd.getpwuid(os.getuid()) # type: ignore 

176 except KeyError: 

177 fullname = None 

178 else: 

179 if getattr(entry, "gecos", None): 

180 fullname = entry.pw_gecos.split(",")[0] 

181 else: 

182 fullname = None 

183 if username is None: 

184 username = entry.pw_name 

185 if not fullname: 

186 if username is None: 

187 raise DefaultIdentityNotFound("no username found") 

188 fullname = username 

189 email = os.environ.get("EMAIL") 

190 if email is None: 

191 if username is None: 

192 raise DefaultIdentityNotFound("no username found") 

193 email = f"{username}@{socket.gethostname()}" 

194 return (fullname, email) 

195 

196 

197def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

198 """Determine the identity to use for new commits. 

199 

200 If kind is set, this first checks 

201 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

202 

203 If those variables are not set, then it will fall back 

204 to reading the user.name and user.email settings from 

205 the specified configuration. 

206 

207 If that also fails, then it will fall back to using 

208 the current users' identity as obtained from the host 

209 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

210 

211 Args: 

212 config: Configuration stack to read from 

213 kind: Optional kind to return identity for, 

214 usually either "AUTHOR" or "COMMITTER". 

215 

216 Returns: 

217 A user identity 

218 """ 

219 user: Optional[bytes] = None 

220 email: Optional[bytes] = None 

221 if kind: 

222 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

223 if user_uc is not None: 

224 user = user_uc.encode("utf-8") 

225 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

226 if email_uc is not None: 

227 email = email_uc.encode("utf-8") 

228 if user is None: 

229 try: 

230 user = config.get(("user",), "name") 

231 except KeyError: 

232 user = None 

233 if email is None: 

234 try: 

235 email = config.get(("user",), "email") 

236 except KeyError: 

237 email = None 

238 default_user, default_email = _get_default_identity() 

239 if user is None: 

240 user = default_user.encode("utf-8") 

241 if email is None: 

242 email = default_email.encode("utf-8") 

243 if email.startswith(b"<") and email.endswith(b">"): 

244 email = email[1:-1] 

245 return user + b" <" + email + b">" 

246 

247 

248def check_user_identity(identity: bytes) -> None: 

249 """Verify that a user identity is formatted correctly. 

250 

251 Args: 

252 identity: User identity bytestring 

253 Raises: 

254 InvalidUserIdentity: Raised when identity is invalid 

255 """ 

256 try: 

257 fst, snd = identity.split(b" <", 1) 

258 except ValueError as exc: 

259 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

260 if b">" not in snd: 

261 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

262 if b"\0" in identity or b"\n" in identity: 

263 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

264 

265 

266def parse_graftpoints( 

267 graftpoints: Iterable[bytes], 

268) -> dict[bytes, list[bytes]]: 

269 """Convert a list of graftpoints into a dict. 

270 

271 Args: 

272 graftpoints: Iterator of graftpoint lines 

273 

274 Each line is formatted as: 

275 <commit sha1> <parent sha1> [<parent sha1>]* 

276 

277 Resulting dictionary is: 

278 <commit sha1>: [<parent sha1>*] 

279 

280 https://git.wiki.kernel.org/index.php/GraftPoint 

281 """ 

282 grafts = {} 

283 for line in graftpoints: 

284 raw_graft = line.split(None, 1) 

285 

286 commit = raw_graft[0] 

287 if len(raw_graft) == 2: 

288 parents = raw_graft[1].split() 

289 else: 

290 parents = [] 

291 

292 for sha in [commit, *parents]: 

293 check_hexsha(sha, "Invalid graftpoint") 

294 

295 grafts[commit] = parents 

296 return grafts 

297 

298 

299def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

300 """Convert a dictionary of grafts into string. 

301 

302 The graft dictionary is: 

303 <commit sha1>: [<parent sha1>*] 

304 

305 Each line is formatted as: 

306 <commit sha1> <parent sha1> [<parent sha1>]* 

307 

308 https://git.wiki.kernel.org/index.php/GraftPoint 

309 

310 """ 

311 graft_lines = [] 

312 for commit, parents in graftpoints.items(): 

313 if parents: 

314 graft_lines.append(commit + b" " + b" ".join(parents)) 

315 else: 

316 graft_lines.append(commit) 

317 return b"\n".join(graft_lines) 

318 

319 

320def _set_filesystem_hidden(path: str) -> None: 

321 """Mark path as to be hidden if supported by platform and filesystem. 

322 

323 On win32 uses SetFileAttributesW api: 

324 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

325 """ 

326 if sys.platform == "win32": 

327 import ctypes 

328 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

329 

330 FILE_ATTRIBUTE_HIDDEN = 2 

331 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

332 ("SetFileAttributesW", ctypes.windll.kernel32) 

333 ) 

334 

335 if isinstance(path, bytes): 

336 path = os.fsdecode(path) 

337 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

338 pass # Could raise or log `ctypes.WinError()` here 

339 

340 # Could implement other platform specific filesystem hiding here 

341 

342 

343class ParentsProvider: 

344 """Provider for commit parent information.""" 

345 

346 def __init__( 

347 self, 

348 store: "BaseObjectStore", 

349 grafts: dict = {}, 

350 shallows: Iterable[bytes] = [], 

351 ) -> None: 

352 """Initialize ParentsProvider. 

353 

354 Args: 

355 store: Object store to use 

356 grafts: Graft information 

357 shallows: Shallow commit SHAs 

358 """ 

359 self.store = store 

360 self.grafts = grafts 

361 self.shallows = set(shallows) 

362 

363 # Get commit graph once at initialization for performance 

364 self.commit_graph = store.get_commit_graph() 

365 

366 def get_parents( 

367 self, commit_id: bytes, commit: Optional[Commit] = None 

368 ) -> list[bytes]: 

369 """Get parents for a commit using the parents provider.""" 

370 try: 

371 return self.grafts[commit_id] 

372 except KeyError: 

373 pass 

374 if commit_id in self.shallows: 

375 return [] 

376 

377 # Try to use commit graph for faster parent lookup 

378 if self.commit_graph: 

379 parents = self.commit_graph.get_parents(commit_id) 

380 if parents is not None: 

381 return parents 

382 

383 # Fallback to reading the commit object 

384 if commit is None: 

385 obj = self.store[commit_id] 

386 assert isinstance(obj, Commit) 

387 commit = obj 

388 return commit.parents 

389 

390 

391class BaseRepo: 

392 """Base class for a git repository. 

393 

394 This base class is meant to be used for Repository implementations that e.g. 

395 work on top of a different transport than a standard filesystem path. 

396 

397 Attributes: 

398 object_store: Dictionary-like object for accessing 

399 the objects 

400 refs: Dictionary-like object with the refs in this 

401 repository 

402 """ 

403 

404 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

405 """Open a repository. 

406 

407 This shouldn't be called directly, but rather through one of the 

408 base classes, such as MemoryRepo or Repo. 

409 

410 Args: 

411 object_store: Object store to use 

412 refs: Refs container to use 

413 """ 

414 self.object_store = object_store 

415 self.refs = refs 

416 

417 self._graftpoints: dict[bytes, list[bytes]] = {} 

418 self.hooks: dict[str, Hook] = {} 

419 

420 def _determine_file_mode(self) -> bool: 

421 """Probe the file-system to determine whether permissions can be trusted. 

422 

423 Returns: True if permissions can be trusted, False otherwise. 

424 """ 

425 raise NotImplementedError(self._determine_file_mode) 

426 

427 def _determine_symlinks(self) -> bool: 

428 """Probe the filesystem to determine whether symlinks can be created. 

429 

430 Returns: True if symlinks can be created, False otherwise. 

431 """ 

432 # For now, just mimic the old behaviour 

433 return sys.platform != "win32" 

434 

435 def _init_files( 

436 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

437 ) -> None: 

438 """Initialize a default set of named files.""" 

439 from .config import ConfigFile 

440 

441 self._put_named_file("description", b"Unnamed repository") 

442 f = BytesIO() 

443 cf = ConfigFile() 

444 if format is None: 

445 format = 0 

446 if format not in (0, 1): 

447 raise ValueError(f"Unsupported repository format version: {format}") 

448 cf.set("core", "repositoryformatversion", str(format)) 

449 if self._determine_file_mode(): 

450 cf.set("core", "filemode", True) 

451 else: 

452 cf.set("core", "filemode", False) 

453 

454 if symlinks is None and not bare: 

455 symlinks = self._determine_symlinks() 

456 

457 if symlinks is False: 

458 cf.set("core", "symlinks", symlinks) 

459 

460 cf.set("core", "bare", bare) 

461 cf.set("core", "logallrefupdates", True) 

462 cf.write_to_file(f) 

463 self._put_named_file("config", f.getvalue()) 

464 self._put_named_file(os.path.join("info", "exclude"), b"") 

465 

466 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

467 """Get a file from the control dir with a specific name. 

468 

469 Although the filename should be interpreted as a filename relative to 

470 the control dir in a disk-based Repo, the object returned need not be 

471 pointing to a file in that location. 

472 

473 Args: 

474 path: The path to the file, relative to the control dir. 

475 Returns: An open file object, or None if the file does not exist. 

476 """ 

477 raise NotImplementedError(self.get_named_file) 

478 

479 def _put_named_file(self, path: str, contents: bytes) -> None: 

480 """Write a file to the control dir with the given name and contents. 

481 

482 Args: 

483 path: The path to the file, relative to the control dir. 

484 contents: A string to write to the file. 

485 """ 

486 raise NotImplementedError(self._put_named_file) 

487 

488 def _del_named_file(self, path: str) -> None: 

489 """Delete a file in the control directory with the given name.""" 

490 raise NotImplementedError(self._del_named_file) 

491 

492 def open_index(self) -> "Index": 

493 """Open the index for this repository. 

494 

495 Raises: 

496 NoIndexPresent: If no index is present 

497 Returns: The matching `Index` 

498 """ 

499 raise NotImplementedError(self.open_index) 

500 

501 def fetch( 

502 self, 

503 target: "BaseRepo", 

504 determine_wants: Optional[Callable] = None, 

505 progress: Optional[Callable] = None, 

506 depth: Optional[int] = None, 

507 ) -> dict: 

508 """Fetch objects into another repository. 

509 

510 Args: 

511 target: The target repository 

512 determine_wants: Optional function to determine what refs to 

513 fetch. 

514 progress: Optional progress function 

515 depth: Optional shallow fetch depth 

516 Returns: The local refs 

517 """ 

518 if determine_wants is None: 

519 determine_wants = target.object_store.determine_wants_all 

520 count, pack_data = self.fetch_pack_data( 

521 determine_wants, 

522 target.get_graph_walker(), 

523 progress=progress, 

524 depth=depth, 

525 ) 

526 target.object_store.add_pack_data(count, pack_data, progress) 

527 return self.get_refs() 

528 

529 def fetch_pack_data( 

530 self, 

531 determine_wants: Callable, 

532 graph_walker: "GraphWalker", 

533 progress: Optional[Callable], 

534 *, 

535 get_tagged: Optional[Callable] = None, 

536 depth: Optional[int] = None, 

537 ) -> tuple: 

538 """Fetch the pack data required for a set of revisions. 

539 

540 Args: 

541 determine_wants: Function that takes a dictionary with heads 

542 and returns the list of heads to fetch. 

543 graph_walker: Object that can iterate over the list of revisions 

544 to fetch and has an "ack" method that will be called to acknowledge 

545 that a revision is present. 

546 progress: Simple progress function that will be called with 

547 updated progress strings. 

548 get_tagged: Function that returns a dict of pointed-to sha -> 

549 tag sha for including tags. 

550 depth: Shallow fetch depth 

551 Returns: count and iterator over pack data 

552 """ 

553 missing_objects = self.find_missing_objects( 

554 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

555 ) 

556 if missing_objects is None: 

557 return 0, iter([]) 

558 remote_has = missing_objects.get_remote_has() 

559 object_ids = list(missing_objects) 

560 return len(object_ids), generate_unpacked_objects( 

561 self.object_store, object_ids, progress=progress, other_haves=remote_has 

562 ) 

563 

564 def find_missing_objects( 

565 self, 

566 determine_wants: Callable, 

567 graph_walker: "GraphWalker", 

568 progress: Optional[Callable], 

569 *, 

570 get_tagged: Optional[Callable] = None, 

571 depth: Optional[int] = None, 

572 ) -> Optional[MissingObjectFinder]: 

573 """Fetch the missing objects required for a set of revisions. 

574 

575 Args: 

576 determine_wants: Function that takes a dictionary with heads 

577 and returns the list of heads to fetch. 

578 graph_walker: Object that can iterate over the list of revisions 

579 to fetch and has an "ack" method that will be called to acknowledge 

580 that a revision is present. 

581 progress: Simple progress function that will be called with 

582 updated progress strings. 

583 get_tagged: Function that returns a dict of pointed-to sha -> 

584 tag sha for including tags. 

585 depth: Shallow fetch depth 

586 Returns: iterator over objects, with __len__ implemented 

587 """ 

588 refs = serialize_refs(self.object_store, self.get_refs()) 

589 

590 wants = determine_wants(refs) 

591 if not isinstance(wants, list): 

592 raise TypeError("determine_wants() did not return a list") 

593 

594 current_shallow = set(getattr(graph_walker, "shallow", set())) 

595 

596 if depth not in (None, 0): 

597 assert depth is not None 

598 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

599 # Only update if graph_walker has shallow attribute 

600 if hasattr(graph_walker, "shallow"): 

601 graph_walker.shallow.update(shallow - not_shallow) 

602 new_shallow = graph_walker.shallow - current_shallow 

603 unshallow = graph_walker.unshallow = not_shallow & current_shallow # type: ignore[attr-defined] 

604 if hasattr(graph_walker, "update_shallow"): 

605 graph_walker.update_shallow(new_shallow, unshallow) 

606 else: 

607 unshallow = getattr(graph_walker, "unshallow", set()) 

608 

609 if wants == []: 

610 # TODO(dborowitz): find a way to short-circuit that doesn't change 

611 # this interface. 

612 

613 if getattr(graph_walker, "shallow", set()) or unshallow: 

614 # Do not send a pack in shallow short-circuit path 

615 return None 

616 

617 class DummyMissingObjectFinder: 

618 """Dummy finder that returns no missing objects.""" 

619 

620 def get_remote_has(self) -> None: 

621 """Get remote has (always returns None). 

622 

623 Returns: 

624 None 

625 """ 

626 return None 

627 

628 def __len__(self) -> int: 

629 return 0 

630 

631 def __iter__(self) -> Iterator[tuple[bytes, Optional[bytes]]]: 

632 yield from [] 

633 

634 return DummyMissingObjectFinder() # type: ignore 

635 

636 # If the graph walker is set up with an implementation that can 

637 # ACK/NAK to the wire, it will write data to the client through 

638 # this call as a side-effect. 

639 haves = self.object_store.find_common_revisions(graph_walker) 

640 

641 # Deal with shallow requests separately because the haves do 

642 # not reflect what objects are missing 

643 if getattr(graph_walker, "shallow", set()) or unshallow: 

644 # TODO: filter the haves commits from iter_shas. the specific 

645 # commits aren't missing. 

646 haves = [] 

647 

648 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

649 

650 def get_parents(commit: Commit) -> list[bytes]: 

651 """Get parents for a commit using the parents provider. 

652 

653 Args: 

654 commit: Commit object 

655 

656 Returns: 

657 List of parent commit SHAs 

658 """ 

659 return parents_provider.get_parents(commit.id, commit) 

660 

661 return MissingObjectFinder( 

662 self.object_store, 

663 haves=haves, 

664 wants=wants, 

665 shallow=getattr(graph_walker, "shallow", set()), 

666 progress=progress, 

667 get_tagged=get_tagged, 

668 get_parents=get_parents, 

669 ) 

670 

671 def generate_pack_data( 

672 self, 

673 have: Iterable[ObjectID], 

674 want: Iterable[ObjectID], 

675 progress: Optional[Callable[[str], None]] = None, 

676 ofs_delta: Optional[bool] = None, 

677 ) -> tuple[int, Iterator["UnpackedObject"]]: 

678 """Generate pack data objects for a set of wants/haves. 

679 

680 Args: 

681 have: List of SHA1s of objects that should not be sent 

682 want: List of SHA1s of objects that should be sent 

683 ofs_delta: Whether OFS deltas can be included 

684 progress: Optional progress reporting method 

685 """ 

686 return self.object_store.generate_pack_data( 

687 have, 

688 want, 

689 shallow=self.get_shallow(), 

690 progress=progress, 

691 ofs_delta=ofs_delta, 

692 ) 

693 

694 def get_graph_walker( 

695 self, heads: Optional[list[ObjectID]] = None 

696 ) -> ObjectStoreGraphWalker: 

697 """Retrieve a graph walker. 

698 

699 A graph walker is used by a remote repository (or proxy) 

700 to find out which objects are present in this repository. 

701 

702 Args: 

703 heads: Repository heads to use (optional) 

704 Returns: A graph walker object 

705 """ 

706 if heads is None: 

707 heads = [ 

708 sha 

709 for sha in self.refs.as_dict(b"refs/heads").values() 

710 if sha in self.object_store 

711 ] 

712 parents_provider = ParentsProvider(self.object_store) 

713 return ObjectStoreGraphWalker( 

714 heads, 

715 parents_provider.get_parents, 

716 shallow=self.get_shallow(), 

717 update_shallow=self.update_shallow, 

718 ) 

719 

720 def get_refs(self) -> dict[bytes, bytes]: 

721 """Get dictionary with all refs. 

722 

723 Returns: A ``dict`` mapping ref names to SHA1s 

724 """ 

725 return self.refs.as_dict() 

726 

727 def head(self) -> bytes: 

728 """Return the SHA1 pointed at by HEAD.""" 

729 # TODO: move this method to WorkTree 

730 return self.refs[b"HEAD"] 

731 

732 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

733 assert len(sha) in (20, 40) 

734 ret = self.get_object(sha) 

735 if not isinstance(ret, cls): 

736 if cls is Commit: 

737 raise NotCommitError(ret.id) 

738 elif cls is Blob: 

739 raise NotBlobError(ret.id) 

740 elif cls is Tree: 

741 raise NotTreeError(ret.id) 

742 elif cls is Tag: 

743 raise NotTagError(ret.id) 

744 else: 

745 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

746 return ret 

747 

748 def get_object(self, sha: bytes) -> ShaFile: 

749 """Retrieve the object with the specified SHA. 

750 

751 Args: 

752 sha: SHA to retrieve 

753 Returns: A ShaFile object 

754 Raises: 

755 KeyError: when the object can not be found 

756 """ 

757 return self.object_store[sha] 

758 

759 def parents_provider(self) -> ParentsProvider: 

760 """Get a parents provider for this repository. 

761 

762 Returns: 

763 ParentsProvider instance configured with grafts and shallows 

764 """ 

765 return ParentsProvider( 

766 self.object_store, 

767 grafts=self._graftpoints, 

768 shallows=self.get_shallow(), 

769 ) 

770 

771 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

772 """Retrieve the parents of a specific commit. 

773 

774 If the specific commit is a graftpoint, the graft parents 

775 will be returned instead. 

776 

777 Args: 

778 sha: SHA of the commit for which to retrieve the parents 

779 commit: Optional commit matching the sha 

780 Returns: List of parents 

781 """ 

782 return self.parents_provider().get_parents(sha, commit) 

783 

784 def get_config(self) -> "ConfigFile": 

785 """Retrieve the config object. 

786 

787 Returns: `ConfigFile` object for the ``.git/config`` file. 

788 """ 

789 raise NotImplementedError(self.get_config) 

790 

791 def get_worktree_config(self) -> "ConfigFile": 

792 """Retrieve the worktree config object.""" 

793 raise NotImplementedError(self.get_worktree_config) 

794 

795 def get_description(self) -> Optional[str]: 

796 """Retrieve the description for this repository. 

797 

798 Returns: String with the description of the repository 

799 as set by the user. 

800 """ 

801 raise NotImplementedError(self.get_description) 

802 

803 def set_description(self, description: bytes) -> None: 

804 """Set the description for this repository. 

805 

806 Args: 

807 description: Text to set as description for this repository. 

808 """ 

809 raise NotImplementedError(self.set_description) 

810 

811 def get_rebase_state_manager(self) -> "RebaseStateManager": 

812 """Get the appropriate rebase state manager for this repository. 

813 

814 Returns: RebaseStateManager instance 

815 """ 

816 raise NotImplementedError(self.get_rebase_state_manager) 

817 

818 def get_blob_normalizer(self) -> "BlobNormalizer": 

819 """Return a BlobNormalizer object for checkin/checkout operations. 

820 

821 Returns: BlobNormalizer instance 

822 """ 

823 raise NotImplementedError(self.get_blob_normalizer) 

824 

825 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

826 """Read gitattributes for the repository. 

827 

828 Args: 

829 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

830 

831 Returns: 

832 GitAttributes object that can be used to match paths 

833 """ 

834 raise NotImplementedError(self.get_gitattributes) 

835 

836 def get_config_stack(self) -> "StackedConfig": 

837 """Return a config stack for this repository. 

838 

839 This stack accesses the configuration for both this repository 

840 itself (.git/config) and the global configuration, which usually 

841 lives in ~/.gitconfig. 

842 

843 Returns: `Config` instance for this repository 

844 """ 

845 from .config import ConfigFile, StackedConfig 

846 

847 local_config = self.get_config() 

848 backends: list[ConfigFile] = [local_config] 

849 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

850 backends.append(self.get_worktree_config()) 

851 

852 backends += StackedConfig.default_backends() 

853 return StackedConfig(backends, writable=local_config) 

854 

855 def get_shallow(self) -> set[ObjectID]: 

856 """Get the set of shallow commits. 

857 

858 Returns: Set of shallow commits. 

859 """ 

860 f = self.get_named_file("shallow") 

861 if f is None: 

862 return set() 

863 with f: 

864 return {line.strip() for line in f} 

865 

866 def update_shallow( 

867 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]] 

868 ) -> None: 

869 """Update the list of shallow objects. 

870 

871 Args: 

872 new_shallow: Newly shallow objects 

873 new_unshallow: Newly no longer shallow objects 

874 """ 

875 shallow = self.get_shallow() 

876 if new_shallow: 

877 shallow.update(new_shallow) 

878 if new_unshallow: 

879 shallow.difference_update(new_unshallow) 

880 if shallow: 

881 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

882 else: 

883 self._del_named_file("shallow") 

884 

885 def get_peeled(self, ref: Ref) -> ObjectID: 

886 """Get the peeled value of a ref. 

887 

888 Args: 

889 ref: The refname to peel. 

890 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

891 intermediate tags; if the original ref does not point to a tag, 

892 this will equal the original SHA1. 

893 """ 

894 cached = self.refs.get_peeled(ref) 

895 if cached is not None: 

896 return cached 

897 return peel_sha(self.object_store, self.refs[ref])[1].id 

898 

899 @property 

900 def notes(self) -> "Notes": 

901 """Access notes functionality for this repository. 

902 

903 Returns: 

904 Notes object for accessing notes 

905 """ 

906 from .notes import Notes 

907 

908 return Notes(self.object_store, self.refs) 

909 

910 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs) -> "Walker": 

911 """Obtain a walker for this repository. 

912 

913 Args: 

914 include: Iterable of SHAs of commits to include along with their 

915 ancestors. Defaults to [HEAD] 

916 **kwargs: Additional keyword arguments including: 

917 

918 * exclude: Iterable of SHAs of commits to exclude along with their 

919 ancestors, overriding includes. 

920 * order: ORDER_* constant specifying the order of results. 

921 Anything other than ORDER_DATE may result in O(n) memory usage. 

922 * reverse: If True, reverse the order of output, requiring O(n) 

923 memory. 

924 * max_entries: The maximum number of entries to yield, or None for 

925 no limit. 

926 * paths: Iterable of file or subtree paths to show entries for. 

927 * rename_detector: diff.RenameDetector object for detecting 

928 renames. 

929 * follow: If True, follow path across renames/copies. Forces a 

930 default rename_detector. 

931 * since: Timestamp to list commits after. 

932 * until: Timestamp to list commits before. 

933 * queue_cls: A class to use for a queue of commits, supporting the 

934 iterator protocol. The constructor takes a single argument, the Walker. 

935 

936 Returns: A `Walker` object 

937 """ 

938 from .walk import Walker 

939 

940 if include is None: 

941 include = [self.head()] 

942 

943 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

944 

945 return Walker(self.object_store, include, **kwargs) 

946 

947 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile": 

948 """Retrieve a Git object by SHA1 or ref. 

949 

950 Args: 

951 name: A Git object SHA1 or a ref name 

952 Returns: A `ShaFile` object, such as a Commit or Blob 

953 Raises: 

954 KeyError: when the specified ref or object does not exist 

955 """ 

956 if not isinstance(name, bytes): 

957 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

958 if len(name) in (20, 40): 

959 try: 

960 return self.object_store[name] 

961 except (KeyError, ValueError): 

962 pass 

963 try: 

964 return self.object_store[self.refs[name]] 

965 except RefFormatError as exc: 

966 raise KeyError(name) from exc 

967 

968 def __contains__(self, name: bytes) -> bool: 

969 """Check if a specific Git object or ref is present. 

970 

971 Args: 

972 name: Git object SHA1 or ref name 

973 """ 

974 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

975 return name in self.object_store or name in self.refs 

976 else: 

977 return name in self.refs 

978 

979 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

980 """Set a ref. 

981 

982 Args: 

983 name: ref name 

984 value: Ref value - either a ShaFile object, or a hex sha 

985 """ 

986 if name.startswith(b"refs/") or name == b"HEAD": 

987 if isinstance(value, ShaFile): 

988 self.refs[name] = value.id 

989 elif isinstance(value, bytes): 

990 self.refs[name] = value 

991 else: 

992 raise TypeError(value) 

993 else: 

994 raise ValueError(name) 

995 

996 def __delitem__(self, name: bytes) -> None: 

997 """Remove a ref. 

998 

999 Args: 

1000 name: Name of the ref to remove 

1001 """ 

1002 if name.startswith(b"refs/") or name == b"HEAD": 

1003 del self.refs[name] 

1004 else: 

1005 raise ValueError(name) 

1006 

1007 def _get_user_identity( 

1008 self, config: "StackedConfig", kind: Optional[str] = None 

1009 ) -> bytes: 

1010 """Determine the identity to use for new commits.""" 

1011 warnings.warn( 

1012 "use get_user_identity() rather than Repo._get_user_identity", 

1013 DeprecationWarning, 

1014 ) 

1015 return get_user_identity(config) 

1016 

1017 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1018 """Add or modify graftpoints. 

1019 

1020 Args: 

1021 updated_graftpoints: Dict of commit shas to list of parent shas 

1022 """ 

1023 # Simple validation 

1024 for commit, parents in updated_graftpoints.items(): 

1025 for sha in [commit, *parents]: 

1026 check_hexsha(sha, "Invalid graftpoint") 

1027 

1028 self._graftpoints.update(updated_graftpoints) 

1029 

1030 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

1031 """Remove graftpoints. 

1032 

1033 Args: 

1034 to_remove: List of commit shas 

1035 """ 

1036 for sha in to_remove: 

1037 del self._graftpoints[sha] 

1038 

1039 def _read_heads(self, name: str) -> list[bytes]: 

1040 f = self.get_named_file(name) 

1041 if f is None: 

1042 return [] 

1043 with f: 

1044 return [line.strip() for line in f.readlines() if line.strip()] 

1045 

1046 def get_worktree(self) -> "WorkTree": 

1047 """Get the working tree for this repository. 

1048 

1049 Returns: 

1050 WorkTree instance for performing working tree operations 

1051 

1052 Raises: 

1053 NotImplementedError: If the repository doesn't support working trees 

1054 """ 

1055 raise NotImplementedError( 

1056 "Working tree operations not supported by this repository type" 

1057 ) 

1058 

1059 @replace_me(remove_in="0.26.0") 

1060 def do_commit( 

1061 self, 

1062 message: Optional[bytes] = None, 

1063 committer: Optional[bytes] = None, 

1064 author: Optional[bytes] = None, 

1065 commit_timestamp: Optional[float] = None, 

1066 commit_timezone: Optional[int] = None, 

1067 author_timestamp: Optional[float] = None, 

1068 author_timezone: Optional[int] = None, 

1069 tree: Optional[ObjectID] = None, 

1070 encoding: Optional[bytes] = None, 

1071 ref: Optional[Ref] = b"HEAD", 

1072 merge_heads: Optional[list[ObjectID]] = None, 

1073 no_verify: bool = False, 

1074 sign: bool = False, 

1075 ) -> bytes: 

1076 """Create a new commit. 

1077 

1078 If not specified, committer and author default to 

1079 get_user_identity(..., 'COMMITTER') 

1080 and get_user_identity(..., 'AUTHOR') respectively. 

1081 

1082 Args: 

1083 message: Commit message (bytes or callable that takes (repo, commit) 

1084 and returns bytes) 

1085 committer: Committer fullname 

1086 author: Author fullname 

1087 commit_timestamp: Commit timestamp (defaults to now) 

1088 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1089 author_timestamp: Author timestamp (defaults to commit 

1090 timestamp) 

1091 author_timezone: Author timestamp timezone 

1092 (defaults to commit timestamp timezone) 

1093 tree: SHA1 of the tree root to use (if not specified the 

1094 current index will be committed). 

1095 encoding: Encoding 

1096 ref: Optional ref to commit to (defaults to current branch). 

1097 If None, creates a dangling commit without updating any ref. 

1098 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1099 no_verify: Skip pre-commit and commit-msg hooks 

1100 sign: GPG Sign the commit (bool, defaults to False, 

1101 pass True to use default GPG key, 

1102 pass a str containing Key ID to use a specific GPG key) 

1103 

1104 Returns: 

1105 New commit SHA1 

1106 """ 

1107 return self.get_worktree().commit( 

1108 message=message, 

1109 committer=committer, 

1110 author=author, 

1111 commit_timestamp=commit_timestamp, 

1112 commit_timezone=commit_timezone, 

1113 author_timestamp=author_timestamp, 

1114 author_timezone=author_timezone, 

1115 tree=tree, 

1116 encoding=encoding, 

1117 ref=ref, 

1118 merge_heads=merge_heads, 

1119 no_verify=no_verify, 

1120 sign=sign, 

1121 ) 

1122 

1123 

1124def read_gitfile(f: BinaryIO) -> str: 

1125 """Read a ``.git`` file. 

1126 

1127 The first line of the file should start with "gitdir: " 

1128 

1129 Args: 

1130 f: File-like object to read from 

1131 Returns: A path 

1132 """ 

1133 cs = f.read() 

1134 if not cs.startswith(b"gitdir: "): 

1135 raise ValueError("Expected file to start with 'gitdir: '") 

1136 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8") 

1137 

1138 

1139class UnsupportedVersion(Exception): 

1140 """Unsupported repository version.""" 

1141 

1142 def __init__(self, version) -> None: 

1143 """Initialize UnsupportedVersion exception. 

1144 

1145 Args: 

1146 version: The unsupported repository version 

1147 """ 

1148 self.version = version 

1149 

1150 

1151class UnsupportedExtension(Exception): 

1152 """Unsupported repository extension.""" 

1153 

1154 def __init__(self, extension) -> None: 

1155 """Initialize UnsupportedExtension exception. 

1156 

1157 Args: 

1158 extension: The unsupported repository extension 

1159 """ 

1160 self.extension = extension 

1161 

1162 

1163class Repo(BaseRepo): 

1164 """A git repository backed by local disk. 

1165 

1166 To open an existing repository, call the constructor with 

1167 the path of the repository. 

1168 

1169 To create a new repository, use the Repo.init class method. 

1170 

1171 Note that a repository object may hold on to resources such 

1172 as file handles for performance reasons; call .close() to free 

1173 up those resources. 

1174 

1175 Attributes: 

1176 path: Path to the working copy (if it exists) or repository control 

1177 directory (if the repository is bare) 

1178 bare: Whether this is a bare repository 

1179 """ 

1180 

1181 path: str 

1182 bare: bool 

1183 object_store: DiskObjectStore 

1184 

1185 def __init__( 

1186 self, 

1187 root: Union[str, bytes, os.PathLike], 

1188 object_store: Optional[PackBasedObjectStore] = None, 

1189 bare: Optional[bool] = None, 

1190 ) -> None: 

1191 """Open a repository on disk. 

1192 

1193 Args: 

1194 root: Path to the repository's root. 

1195 object_store: ObjectStore to use; if omitted, we use the 

1196 repository's default object store 

1197 bare: True if this is a bare repository. 

1198 """ 

1199 root = os.fspath(root) 

1200 if isinstance(root, bytes): 

1201 root = os.fsdecode(root) 

1202 hidden_path = os.path.join(root, CONTROLDIR) 

1203 if bare is None: 

1204 if os.path.isfile(hidden_path) or os.path.isdir( 

1205 os.path.join(hidden_path, OBJECTDIR) 

1206 ): 

1207 bare = False 

1208 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1209 os.path.join(root, REFSDIR) 

1210 ): 

1211 bare = True 

1212 else: 

1213 raise NotGitRepository( 

1214 "No git repository was found at {path}".format(**dict(path=root)) 

1215 ) 

1216 

1217 self.bare = bare 

1218 if bare is False: 

1219 if os.path.isfile(hidden_path): 

1220 with open(hidden_path, "rb") as f: 

1221 path = read_gitfile(f) 

1222 self._controldir = os.path.join(root, path) 

1223 else: 

1224 self._controldir = hidden_path 

1225 else: 

1226 self._controldir = root 

1227 commondir = self.get_named_file(COMMONDIR) 

1228 if commondir is not None: 

1229 with commondir: 

1230 self._commondir = os.path.join( 

1231 self.controldir(), 

1232 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1233 ) 

1234 else: 

1235 self._commondir = self._controldir 

1236 self.path = root 

1237 

1238 # Initialize refs early so they're available for config condition matchers 

1239 self.refs = DiskRefsContainer( 

1240 self.commondir(), self._controldir, logger=self._write_reflog 

1241 ) 

1242 

1243 # Initialize worktrees container 

1244 from .worktree import WorkTreeContainer 

1245 

1246 self.worktrees = WorkTreeContainer(self) 

1247 

1248 config = self.get_config() 

1249 try: 

1250 repository_format_version = config.get("core", "repositoryformatversion") 

1251 format_version = ( 

1252 0 

1253 if repository_format_version is None 

1254 else int(repository_format_version) 

1255 ) 

1256 except KeyError: 

1257 format_version = 0 

1258 

1259 if format_version not in (0, 1): 

1260 raise UnsupportedVersion(format_version) 

1261 

1262 # Track extensions we encounter 

1263 has_reftable_extension = False 

1264 for extension, value in config.items((b"extensions",)): 

1265 if extension.lower() == b"refstorage": 

1266 if value == b"reftable": 

1267 has_reftable_extension = True 

1268 else: 

1269 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1270 elif extension.lower() not in (b"worktreeconfig",): 

1271 raise UnsupportedExtension(extension) 

1272 

1273 if object_store is None: 

1274 object_store = DiskObjectStore.from_config( 

1275 os.path.join(self.commondir(), OBJECTDIR), config 

1276 ) 

1277 

1278 # Use reftable if extension is configured 

1279 if has_reftable_extension: 

1280 from .reftable import ReftableRefsContainer 

1281 

1282 self.refs = ReftableRefsContainer(self.commondir()) 

1283 # Update worktrees container after refs change 

1284 self.worktrees = WorkTreeContainer(self) 

1285 BaseRepo.__init__(self, object_store, self.refs) 

1286 

1287 self._graftpoints = {} 

1288 graft_file = self.get_named_file( 

1289 os.path.join("info", "grafts"), basedir=self.commondir() 

1290 ) 

1291 if graft_file: 

1292 with graft_file: 

1293 self._graftpoints.update(parse_graftpoints(graft_file)) 

1294 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1295 if graft_file: 

1296 with graft_file: 

1297 self._graftpoints.update(parse_graftpoints(graft_file)) 

1298 

1299 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1300 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1301 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1302 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1303 

1304 def get_worktree(self) -> "WorkTree": 

1305 """Get the working tree for this repository. 

1306 

1307 Returns: 

1308 WorkTree instance for performing working tree operations 

1309 """ 

1310 from .worktree import WorkTree 

1311 

1312 return WorkTree(self, self.path) 

1313 

1314 def _write_reflog( 

1315 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1316 ) -> None: 

1317 from .reflog import format_reflog_line 

1318 

1319 path = self._reflog_path(ref) 

1320 try: 

1321 os.makedirs(os.path.dirname(path)) 

1322 except FileExistsError: 

1323 pass 

1324 if committer is None: 

1325 config = self.get_config_stack() 

1326 committer = get_user_identity(config) 

1327 check_user_identity(committer) 

1328 if timestamp is None: 

1329 timestamp = int(time.time()) 

1330 if timezone is None: 

1331 timezone = 0 # FIXME 

1332 with open(path, "ab") as f: 

1333 f.write( 

1334 format_reflog_line( 

1335 old_sha, new_sha, committer, timestamp, timezone, message 

1336 ) 

1337 + b"\n" 

1338 ) 

1339 

1340 def _reflog_path(self, ref: bytes) -> str: 

1341 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1342 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1343 

1344 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1345 return os.path.join(base, "logs", os.fsdecode(ref)) 

1346 

1347 def read_reflog(self, ref): 

1348 """Read reflog entries for a reference. 

1349 

1350 Args: 

1351 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1352 

1353 Yields: 

1354 reflog.Entry objects in chronological order (oldest first) 

1355 """ 

1356 from .reflog import read_reflog 

1357 

1358 path = self._reflog_path(ref) 

1359 try: 

1360 with open(path, "rb") as f: 

1361 yield from read_reflog(f) 

1362 except FileNotFoundError: 

1363 return 

1364 

1365 @classmethod 

1366 def discover(cls, start="."): 

1367 """Iterate parent directories to discover a repository. 

1368 

1369 Return a Repo object for the first parent directory that looks like a 

1370 Git repository. 

1371 

1372 Args: 

1373 start: The directory to start discovery from (defaults to '.') 

1374 """ 

1375 remaining = True 

1376 path = os.path.abspath(start) 

1377 while remaining: 

1378 try: 

1379 return cls(path) 

1380 except NotGitRepository: 

1381 path, remaining = os.path.split(path) 

1382 raise NotGitRepository( 

1383 "No git repository was found at {path}".format(**dict(path=start)) 

1384 ) 

1385 

1386 def controldir(self) -> str: 

1387 """Return the path of the control directory.""" 

1388 return self._controldir 

1389 

1390 def commondir(self) -> str: 

1391 """Return the path of the common directory. 

1392 

1393 For a main working tree, it is identical to controldir(). 

1394 

1395 For a linked working tree, it is the control directory of the 

1396 main working tree. 

1397 """ 

1398 return self._commondir 

1399 

1400 def _determine_file_mode(self) -> bool: 

1401 """Probe the file-system to determine whether permissions can be trusted. 

1402 

1403 Returns: True if permissions can be trusted, False otherwise. 

1404 """ 

1405 fname = os.path.join(self.path, ".probe-permissions") 

1406 with open(fname, "w") as f: 

1407 f.write("") 

1408 

1409 st1 = os.lstat(fname) 

1410 try: 

1411 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1412 except PermissionError: 

1413 return False 

1414 st2 = os.lstat(fname) 

1415 

1416 os.unlink(fname) 

1417 

1418 mode_differs = st1.st_mode != st2.st_mode 

1419 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1420 

1421 return mode_differs and st2_has_exec 

1422 

1423 def _determine_symlinks(self) -> bool: 

1424 """Probe the filesystem to determine whether symlinks can be created. 

1425 

1426 Returns: True if symlinks can be created, False otherwise. 

1427 """ 

1428 # TODO(jelmer): Actually probe disk / look at filesystem 

1429 return sys.platform != "win32" 

1430 

1431 def _put_named_file(self, path: str, contents: bytes) -> None: 

1432 """Write a file to the control dir with the given name and contents. 

1433 

1434 Args: 

1435 path: The path to the file, relative to the control dir. 

1436 contents: A string to write to the file. 

1437 """ 

1438 path = path.lstrip(os.path.sep) 

1439 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1440 f.write(contents) 

1441 

1442 def _del_named_file(self, path: str) -> None: 

1443 try: 

1444 os.unlink(os.path.join(self.controldir(), path)) 

1445 except FileNotFoundError: 

1446 return 

1447 

1448 def get_named_file(self, path, basedir=None): 

1449 """Get a file from the control dir with a specific name. 

1450 

1451 Although the filename should be interpreted as a filename relative to 

1452 the control dir in a disk-based Repo, the object returned need not be 

1453 pointing to a file in that location. 

1454 

1455 Args: 

1456 path: The path to the file, relative to the control dir. 

1457 basedir: Optional argument that specifies an alternative to the 

1458 control dir. 

1459 Returns: An open file object, or None if the file does not exist. 

1460 """ 

1461 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1462 # the dumb web serving code. 

1463 if basedir is None: 

1464 basedir = self.controldir() 

1465 path = path.lstrip(os.path.sep) 

1466 try: 

1467 return open(os.path.join(basedir, path), "rb") 

1468 except FileNotFoundError: 

1469 return None 

1470 

1471 def index_path(self): 

1472 """Return path to the index file.""" 

1473 return os.path.join(self.controldir(), INDEX_FILENAME) 

1474 

1475 def open_index(self) -> "Index": 

1476 """Open the index for this repository. 

1477 

1478 Raises: 

1479 NoIndexPresent: If no index is present 

1480 Returns: The matching `Index` 

1481 """ 

1482 from .index import Index 

1483 

1484 if not self.has_index(): 

1485 raise NoIndexPresent 

1486 

1487 # Check for manyFiles feature configuration 

1488 config = self.get_config_stack() 

1489 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1490 skip_hash = False 

1491 index_version = None 

1492 

1493 if many_files: 

1494 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1495 try: 

1496 index_version_str = config.get(b"index", b"version") 

1497 index_version = int(index_version_str) 

1498 except KeyError: 

1499 index_version = 4 # Default to version 4 for manyFiles 

1500 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1501 else: 

1502 # Check for explicit index settings 

1503 try: 

1504 index_version_str = config.get(b"index", b"version") 

1505 index_version = int(index_version_str) 

1506 except KeyError: 

1507 index_version = None 

1508 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1509 

1510 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1511 

1512 def has_index(self) -> bool: 

1513 """Check if an index is present.""" 

1514 # Bare repos must never have index files; non-bare repos may have a 

1515 # missing index file, which is treated as empty. 

1516 return not self.bare 

1517 

1518 @replace_me(remove_in="0.26.0") 

1519 def stage( 

1520 self, 

1521 fs_paths: Union[ 

1522 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1523 ], 

1524 ) -> None: 

1525 """Stage a set of paths. 

1526 

1527 Args: 

1528 fs_paths: List of paths, relative to the repository path 

1529 """ 

1530 return self.get_worktree().stage(fs_paths) 

1531 

1532 @replace_me(remove_in="0.26.0") 

1533 def unstage(self, fs_paths: list[str]) -> None: 

1534 """Unstage specific file in the index. 

1535 

1536 Args: 

1537 fs_paths: a list of files to unstage, 

1538 relative to the repository path. 

1539 """ 

1540 return self.get_worktree().unstage(fs_paths) 

1541 

1542 def clone( 

1543 self, 

1544 target_path, 

1545 *, 

1546 mkdir=True, 

1547 bare=False, 

1548 origin=b"origin", 

1549 checkout=None, 

1550 branch=None, 

1551 progress=None, 

1552 depth: Optional[int] = None, 

1553 symlinks=None, 

1554 ) -> "Repo": 

1555 """Clone this repository. 

1556 

1557 Args: 

1558 target_path: Target path 

1559 mkdir: Create the target directory 

1560 bare: Whether to create a bare repository 

1561 checkout: Whether or not to check-out HEAD after cloning 

1562 origin: Base name for refs in target repository 

1563 cloned from this repository 

1564 branch: Optional branch or tag to be used as HEAD in the new repository 

1565 instead of this repository's HEAD. 

1566 progress: Optional progress function 

1567 depth: Depth at which to fetch 

1568 symlinks: Symlinks setting (default to autodetect) 

1569 Returns: Created repository as `Repo` 

1570 """ 

1571 encoded_path = os.fsencode(self.path) 

1572 

1573 if mkdir: 

1574 os.mkdir(target_path) 

1575 

1576 try: 

1577 if not bare: 

1578 target = Repo.init(target_path, symlinks=symlinks) 

1579 if checkout is None: 

1580 checkout = True 

1581 else: 

1582 if checkout: 

1583 raise ValueError("checkout and bare are incompatible") 

1584 target = Repo.init_bare(target_path) 

1585 

1586 try: 

1587 target_config = target.get_config() 

1588 target_config.set((b"remote", origin), b"url", encoded_path) 

1589 target_config.set( 

1590 (b"remote", origin), 

1591 b"fetch", 

1592 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1593 ) 

1594 target_config.write_to_path() 

1595 

1596 ref_message = b"clone: from " + encoded_path 

1597 self.fetch(target, depth=depth) 

1598 target.refs.import_refs( 

1599 b"refs/remotes/" + origin, 

1600 self.refs.as_dict(b"refs/heads"), 

1601 message=ref_message, 

1602 ) 

1603 target.refs.import_refs( 

1604 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1605 ) 

1606 

1607 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1608 origin_head = head_chain[-1] if head_chain else None 

1609 if origin_sha and not origin_head: 

1610 # set detached HEAD 

1611 target.refs[b"HEAD"] = origin_sha 

1612 else: 

1613 _set_origin_head(target.refs, origin, origin_head) 

1614 head_ref = _set_default_branch( 

1615 target.refs, origin, origin_head, branch, ref_message 

1616 ) 

1617 

1618 # Update target head 

1619 if head_ref: 

1620 head = _set_head(target.refs, head_ref, ref_message) 

1621 else: 

1622 head = None 

1623 

1624 if checkout and head is not None: 

1625 target.get_worktree().reset_index() 

1626 except BaseException: 

1627 target.close() 

1628 raise 

1629 except BaseException: 

1630 if mkdir: 

1631 import shutil 

1632 

1633 shutil.rmtree(target_path) 

1634 raise 

1635 return target 

1636 

1637 @replace_me(remove_in="0.26.0") 

1638 def reset_index(self, tree: Optional[bytes] = None): 

1639 """Reset the index back to a specific tree. 

1640 

1641 Args: 

1642 tree: Tree SHA to reset to, None for current HEAD tree. 

1643 """ 

1644 return self.get_worktree().reset_index(tree) 

1645 

1646 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1647 """Get condition matchers for includeIf conditions. 

1648 

1649 Returns a dict of condition prefix to matcher function. 

1650 """ 

1651 from pathlib import Path 

1652 

1653 from .config import ConditionMatcher, match_glob_pattern 

1654 

1655 # Add gitdir matchers 

1656 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1657 """Match gitdir against a pattern. 

1658 

1659 Args: 

1660 pattern: Pattern to match against 

1661 case_sensitive: Whether to match case-sensitively 

1662 

1663 Returns: 

1664 True if gitdir matches pattern 

1665 """ 

1666 # Handle relative patterns (starting with ./) 

1667 if pattern.startswith("./"): 

1668 # Can't handle relative patterns without config directory context 

1669 return False 

1670 

1671 # Normalize repository path 

1672 try: 

1673 repo_path = str(Path(self._controldir).resolve()) 

1674 except (OSError, ValueError): 

1675 return False 

1676 

1677 # Expand ~ in pattern and normalize 

1678 pattern = os.path.expanduser(pattern) 

1679 

1680 # Normalize pattern following Git's rules 

1681 pattern = pattern.replace("\\", "/") 

1682 if not pattern.startswith(("~/", "./", "/", "**")): 

1683 # Check for Windows absolute path 

1684 if len(pattern) >= 2 and pattern[1] == ":": 

1685 pass 

1686 else: 

1687 pattern = "**/" + pattern 

1688 if pattern.endswith("/"): 

1689 pattern = pattern + "**" 

1690 

1691 # Use the existing _match_gitdir_pattern function 

1692 from .config import _match_gitdir_pattern 

1693 

1694 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1695 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1696 

1697 return _match_gitdir_pattern( 

1698 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1699 ) 

1700 

1701 # Add onbranch matcher 

1702 def match_onbranch(pattern: str) -> bool: 

1703 """Match current branch against a pattern. 

1704 

1705 Args: 

1706 pattern: Pattern to match against 

1707 

1708 Returns: 

1709 True if current branch matches pattern 

1710 """ 

1711 try: 

1712 # Get the current branch using refs 

1713 ref_chain, _ = self.refs.follow(b"HEAD") 

1714 head_ref = ref_chain[-1] # Get the final resolved ref 

1715 except KeyError: 

1716 pass 

1717 else: 

1718 if head_ref and head_ref.startswith(b"refs/heads/"): 

1719 # Extract branch name from ref 

1720 branch = head_ref[11:].decode("utf-8", errors="replace") 

1721 return match_glob_pattern(branch, pattern) 

1722 return False 

1723 

1724 matchers: dict[str, ConditionMatcher] = { 

1725 "onbranch:": match_onbranch, 

1726 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1727 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1728 } 

1729 

1730 return matchers 

1731 

1732 def get_worktree_config(self) -> "ConfigFile": 

1733 """Get the worktree-specific config. 

1734 

1735 Returns: 

1736 ConfigFile object for the worktree config 

1737 """ 

1738 from .config import ConfigFile 

1739 

1740 path = os.path.join(self.commondir(), "config.worktree") 

1741 try: 

1742 # Pass condition matchers for includeIf evaluation 

1743 condition_matchers = self._get_config_condition_matchers() 

1744 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1745 except FileNotFoundError: 

1746 cf = ConfigFile() 

1747 cf.path = path 

1748 return cf 

1749 

1750 def get_config(self) -> "ConfigFile": 

1751 """Retrieve the config object. 

1752 

1753 Returns: `ConfigFile` object for the ``.git/config`` file. 

1754 """ 

1755 from .config import ConfigFile 

1756 

1757 path = os.path.join(self._commondir, "config") 

1758 try: 

1759 # Pass condition matchers for includeIf evaluation 

1760 condition_matchers = self._get_config_condition_matchers() 

1761 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1762 except FileNotFoundError: 

1763 ret = ConfigFile() 

1764 ret.path = path 

1765 return ret 

1766 

1767 def get_rebase_state_manager(self): 

1768 """Get the appropriate rebase state manager for this repository. 

1769 

1770 Returns: DiskRebaseStateManager instance 

1771 """ 

1772 import os 

1773 

1774 from .rebase import DiskRebaseStateManager 

1775 

1776 path = os.path.join(self.controldir(), "rebase-merge") 

1777 return DiskRebaseStateManager(path) 

1778 

1779 def get_description(self): 

1780 """Retrieve the description of this repository. 

1781 

1782 Returns: A string describing the repository or None. 

1783 """ 

1784 path = os.path.join(self._controldir, "description") 

1785 try: 

1786 with GitFile(path, "rb") as f: 

1787 return f.read() 

1788 except FileNotFoundError: 

1789 return None 

1790 

1791 def __repr__(self) -> str: 

1792 """Return string representation of this repository.""" 

1793 return f"<Repo at {self.path!r}>" 

1794 

1795 def set_description(self, description) -> None: 

1796 """Set the description for this repository. 

1797 

1798 Args: 

1799 description: Text to set as description for this repository. 

1800 """ 

1801 self._put_named_file("description", description) 

1802 

1803 @classmethod 

1804 def _init_maybe_bare( 

1805 cls, 

1806 path: Union[str, bytes, os.PathLike], 

1807 controldir: Union[str, bytes, os.PathLike], 

1808 bare, 

1809 object_store=None, 

1810 config=None, 

1811 default_branch=None, 

1812 symlinks: Optional[bool] = None, 

1813 format: Optional[int] = None, 

1814 ): 

1815 path = os.fspath(path) 

1816 if isinstance(path, bytes): 

1817 path = os.fsdecode(path) 

1818 controldir = os.fspath(controldir) 

1819 if isinstance(controldir, bytes): 

1820 controldir = os.fsdecode(controldir) 

1821 for d in BASE_DIRECTORIES: 

1822 os.mkdir(os.path.join(controldir, *d)) 

1823 if object_store is None: 

1824 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1825 ret = cls(path, bare=bare, object_store=object_store) 

1826 if default_branch is None: 

1827 if config is None: 

1828 from .config import StackedConfig 

1829 

1830 config = StackedConfig.default() 

1831 try: 

1832 default_branch = config.get("init", "defaultBranch") 

1833 except KeyError: 

1834 default_branch = DEFAULT_BRANCH 

1835 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1836 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1837 return ret 

1838 

1839 @classmethod 

1840 def init( 

1841 cls, 

1842 path: Union[str, bytes, os.PathLike], 

1843 *, 

1844 mkdir: bool = False, 

1845 config=None, 

1846 default_branch=None, 

1847 symlinks: Optional[bool] = None, 

1848 format: Optional[int] = None, 

1849 ) -> "Repo": 

1850 """Create a new repository. 

1851 

1852 Args: 

1853 path: Path in which to create the repository 

1854 mkdir: Whether to create the directory 

1855 config: Configuration object 

1856 default_branch: Default branch name 

1857 symlinks: Whether to support symlinks 

1858 format: Repository format version (defaults to 0) 

1859 Returns: `Repo` instance 

1860 """ 

1861 path = os.fspath(path) 

1862 if isinstance(path, bytes): 

1863 path = os.fsdecode(path) 

1864 if mkdir: 

1865 os.mkdir(path) 

1866 controldir = os.path.join(path, CONTROLDIR) 

1867 os.mkdir(controldir) 

1868 _set_filesystem_hidden(controldir) 

1869 return cls._init_maybe_bare( 

1870 path, 

1871 controldir, 

1872 False, 

1873 config=config, 

1874 default_branch=default_branch, 

1875 symlinks=symlinks, 

1876 format=format, 

1877 ) 

1878 

1879 @classmethod 

1880 def _init_new_working_directory( 

1881 cls, 

1882 path: Union[str, bytes, os.PathLike], 

1883 main_repo, 

1884 identifier=None, 

1885 mkdir=False, 

1886 ): 

1887 """Create a new working directory linked to a repository. 

1888 

1889 Args: 

1890 path: Path in which to create the working tree. 

1891 main_repo: Main repository to reference 

1892 identifier: Worktree identifier 

1893 mkdir: Whether to create the directory 

1894 Returns: `Repo` instance 

1895 """ 

1896 path = os.fspath(path) 

1897 if isinstance(path, bytes): 

1898 path = os.fsdecode(path) 

1899 if mkdir: 

1900 os.mkdir(path) 

1901 if identifier is None: 

1902 identifier = os.path.basename(path) 

1903 # Ensure we use absolute path for the worktree control directory 

1904 main_controldir = os.path.abspath(main_repo.controldir()) 

1905 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1906 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1907 gitdirfile = os.path.join(path, CONTROLDIR) 

1908 with open(gitdirfile, "wb") as f: 

1909 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1910 try: 

1911 os.mkdir(main_worktreesdir) 

1912 except FileExistsError: 

1913 pass 

1914 try: 

1915 os.mkdir(worktree_controldir) 

1916 except FileExistsError: 

1917 pass 

1918 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1919 f.write(os.fsencode(gitdirfile) + b"\n") 

1920 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1921 f.write(b"../..\n") 

1922 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1923 f.write(main_repo.head() + b"\n") 

1924 r = cls(os.path.normpath(path)) 

1925 r.get_worktree().reset_index() 

1926 return r 

1927 

1928 @classmethod 

1929 def init_bare( 

1930 cls, 

1931 path: Union[str, bytes, os.PathLike], 

1932 *, 

1933 mkdir=False, 

1934 object_store=None, 

1935 config=None, 

1936 default_branch=None, 

1937 format: Optional[int] = None, 

1938 ): 

1939 """Create a new bare repository. 

1940 

1941 ``path`` should already exist and be an empty directory. 

1942 

1943 Args: 

1944 path: Path to create bare repository in 

1945 mkdir: Whether to create the directory 

1946 object_store: Object store to use 

1947 config: Configuration object 

1948 default_branch: Default branch name 

1949 format: Repository format version (defaults to 0) 

1950 Returns: a `Repo` instance 

1951 """ 

1952 path = os.fspath(path) 

1953 if isinstance(path, bytes): 

1954 path = os.fsdecode(path) 

1955 if mkdir: 

1956 os.mkdir(path) 

1957 return cls._init_maybe_bare( 

1958 path, 

1959 path, 

1960 True, 

1961 object_store=object_store, 

1962 config=config, 

1963 default_branch=default_branch, 

1964 format=format, 

1965 ) 

1966 

1967 create = init_bare 

1968 

1969 def close(self) -> None: 

1970 """Close any files opened by this repository.""" 

1971 self.object_store.close() 

1972 

1973 def __enter__(self): 

1974 """Enter context manager.""" 

1975 return self 

1976 

1977 def __exit__(self, exc_type, exc_val, exc_tb): 

1978 """Exit context manager and close repository.""" 

1979 self.close() 

1980 

1981 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

1982 """Read .gitattributes file from working tree. 

1983 

1984 Returns: 

1985 Dictionary mapping file patterns to attributes 

1986 """ 

1987 gitattributes = {} 

1988 gitattributes_path = os.path.join(self.path, ".gitattributes") 

1989 

1990 if os.path.exists(gitattributes_path): 

1991 with open(gitattributes_path, "rb") as f: 

1992 for line in f: 

1993 line = line.strip() 

1994 if not line or line.startswith(b"#"): 

1995 continue 

1996 

1997 parts = line.split() 

1998 if len(parts) < 2: 

1999 continue 

2000 

2001 pattern = parts[0] 

2002 attrs = {} 

2003 

2004 for attr in parts[1:]: 

2005 if attr.startswith(b"-"): 

2006 # Unset attribute 

2007 attrs[attr[1:]] = b"false" 

2008 elif b"=" in attr: 

2009 # Set to value 

2010 key, value = attr.split(b"=", 1) 

2011 attrs[key] = value 

2012 else: 

2013 # Set attribute 

2014 attrs[attr] = b"true" 

2015 

2016 gitattributes[pattern] = attrs 

2017 

2018 return gitattributes 

2019 

2020 def get_blob_normalizer(self): 

2021 """Return a BlobNormalizer object.""" 

2022 from .filters import FilterBlobNormalizer, FilterRegistry 

2023 

2024 # Get proper GitAttributes object 

2025 git_attributes = self.get_gitattributes() 

2026 config_stack = self.get_config_stack() 

2027 

2028 # Create FilterRegistry with repo reference 

2029 filter_registry = FilterRegistry(config_stack, self) 

2030 

2031 # Return FilterBlobNormalizer which handles all filters including line endings 

2032 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2033 

2034 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2035 """Read gitattributes for the repository. 

2036 

2037 Args: 

2038 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2039 

2040 Returns: 

2041 GitAttributes object that can be used to match paths 

2042 """ 

2043 from .attrs import ( 

2044 GitAttributes, 

2045 Pattern, 

2046 parse_git_attributes, 

2047 ) 

2048 

2049 patterns = [] 

2050 

2051 # Read system gitattributes (TODO: implement this) 

2052 # Read global gitattributes (TODO: implement this) 

2053 

2054 # Read repository .gitattributes from index/tree 

2055 if tree is None: 

2056 try: 

2057 # Try to get from HEAD 

2058 head = self[b"HEAD"] 

2059 if isinstance(head, Tag): 

2060 _cls, obj = head.object 

2061 head = self.get_object(obj) 

2062 assert isinstance(head, Commit) 

2063 tree = head.tree 

2064 except KeyError: 

2065 # No HEAD, no attributes from tree 

2066 pass 

2067 

2068 if tree is not None: 

2069 try: 

2070 tree_obj = self[tree] 

2071 assert isinstance(tree_obj, Tree) 

2072 if b".gitattributes" in tree_obj: 

2073 _, attrs_sha = tree_obj[b".gitattributes"] 

2074 attrs_blob = self[attrs_sha] 

2075 if isinstance(attrs_blob, Blob): 

2076 attrs_data = BytesIO(attrs_blob.data) 

2077 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2078 pattern = Pattern(pattern_bytes) 

2079 patterns.append((pattern, attrs)) 

2080 except (KeyError, NotTreeError): 

2081 pass 

2082 

2083 # Read .git/info/attributes 

2084 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2085 if os.path.exists(info_attrs_path): 

2086 with open(info_attrs_path, "rb") as f: 

2087 for pattern_bytes, attrs in parse_git_attributes(f): 

2088 pattern = Pattern(pattern_bytes) 

2089 patterns.append((pattern, attrs)) 

2090 

2091 # Read .gitattributes from working directory (if it exists) 

2092 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2093 if os.path.exists(working_attrs_path): 

2094 with open(working_attrs_path, "rb") as f: 

2095 for pattern_bytes, attrs in parse_git_attributes(f): 

2096 pattern = Pattern(pattern_bytes) 

2097 patterns.append((pattern, attrs)) 

2098 

2099 return GitAttributes(patterns) 

2100 

2101 @replace_me(remove_in="0.26.0") 

2102 def _sparse_checkout_file_path(self) -> str: 

2103 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2104 return self.get_worktree()._sparse_checkout_file_path() 

2105 

2106 @replace_me(remove_in="0.26.0") 

2107 def configure_for_cone_mode(self) -> None: 

2108 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2109 return self.get_worktree().configure_for_cone_mode() 

2110 

2111 @replace_me(remove_in="0.26.0") 

2112 def infer_cone_mode(self) -> bool: 

2113 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2114 return self.get_worktree().infer_cone_mode() 

2115 

2116 @replace_me(remove_in="0.26.0") 

2117 def get_sparse_checkout_patterns(self) -> list[str]: 

2118 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2119 

2120 Returns: 

2121 A list of patterns. Returns an empty list if the file is missing. 

2122 """ 

2123 return self.get_worktree().get_sparse_checkout_patterns() 

2124 

2125 @replace_me(remove_in="0.26.0") 

2126 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2127 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2128 

2129 Creates the info/ directory if it does not exist. 

2130 

2131 Args: 

2132 patterns: A list of gitignore-style patterns to store. 

2133 """ 

2134 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2135 

2136 @replace_me(remove_in="0.26.0") 

2137 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2138 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2139 

2140 For each directory to include, add an inclusion line that "undoes" the prior 

2141 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2142 Never add the same line twice. 

2143 """ 

2144 return self.get_worktree().set_cone_mode_patterns(dirs) 

2145 

2146 

2147class MemoryRepo(BaseRepo): 

2148 """Repo that stores refs, objects, and named files in memory. 

2149 

2150 MemoryRepos are always bare: they have no working tree and no index, since 

2151 those have a stronger dependency on the filesystem. 

2152 """ 

2153 

2154 def __init__(self) -> None: 

2155 """Create a new repository in memory.""" 

2156 from .config import ConfigFile 

2157 

2158 self._reflog: list[Any] = [] 

2159 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2160 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore[arg-type] 

2161 self._named_files: dict[str, bytes] = {} 

2162 self.bare = True 

2163 self._config = ConfigFile() 

2164 self._description = None 

2165 

2166 def _append_reflog(self, *args) -> None: 

2167 self._reflog.append(args) 

2168 

2169 def set_description(self, description) -> None: 

2170 """Set the description for this repository. 

2171 

2172 Args: 

2173 description: Text to set as description 

2174 """ 

2175 self._description = description 

2176 

2177 def get_description(self): 

2178 """Get the description of this repository. 

2179 

2180 Returns: 

2181 Repository description as bytes 

2182 """ 

2183 return self._description 

2184 

2185 def _determine_file_mode(self): 

2186 """Probe the file-system to determine whether permissions can be trusted. 

2187 

2188 Returns: True if permissions can be trusted, False otherwise. 

2189 """ 

2190 return sys.platform != "win32" 

2191 

2192 def _determine_symlinks(self): 

2193 """Probe the file-system to determine whether permissions can be trusted. 

2194 

2195 Returns: True if permissions can be trusted, False otherwise. 

2196 """ 

2197 return sys.platform != "win32" 

2198 

2199 def _put_named_file(self, path, contents) -> None: 

2200 """Write a file to the control dir with the given name and contents. 

2201 

2202 Args: 

2203 path: The path to the file, relative to the control dir. 

2204 contents: A string to write to the file. 

2205 """ 

2206 self._named_files[path] = contents 

2207 

2208 def _del_named_file(self, path) -> None: 

2209 try: 

2210 del self._named_files[path] 

2211 except KeyError: 

2212 pass 

2213 

2214 def get_named_file(self, path, basedir=None): 

2215 """Get a file from the control dir with a specific name. 

2216 

2217 Although the filename should be interpreted as a filename relative to 

2218 the control dir in a disk-baked Repo, the object returned need not be 

2219 pointing to a file in that location. 

2220 

2221 Args: 

2222 path: The path to the file, relative to the control dir. 

2223 basedir: Optional base directory for the path 

2224 Returns: An open file object, or None if the file does not exist. 

2225 """ 

2226 contents = self._named_files.get(path, None) 

2227 if contents is None: 

2228 return None 

2229 return BytesIO(contents) 

2230 

2231 def open_index(self) -> "Index": 

2232 """Fail to open index for this repo, since it is bare. 

2233 

2234 Raises: 

2235 NoIndexPresent: Raised when no index is present 

2236 """ 

2237 raise NoIndexPresent 

2238 

2239 def get_config(self): 

2240 """Retrieve the config object. 

2241 

2242 Returns: `ConfigFile` object. 

2243 """ 

2244 return self._config 

2245 

2246 def get_rebase_state_manager(self): 

2247 """Get the appropriate rebase state manager for this repository. 

2248 

2249 Returns: MemoryRebaseStateManager instance 

2250 """ 

2251 from .rebase import MemoryRebaseStateManager 

2252 

2253 return MemoryRebaseStateManager(self) 

2254 

2255 def get_blob_normalizer(self): 

2256 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2257 from .filters import FilterBlobNormalizer, FilterRegistry 

2258 

2259 # Get GitAttributes object 

2260 git_attributes = self.get_gitattributes() 

2261 config_stack = self.get_config_stack() 

2262 

2263 # Create FilterRegistry with repo reference 

2264 filter_registry = FilterRegistry(config_stack, self) 

2265 

2266 # Return FilterBlobNormalizer which handles all filters 

2267 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2268 

2269 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2270 """Read gitattributes for the repository.""" 

2271 from .attrs import GitAttributes 

2272 

2273 # Memory repos don't have working trees or gitattributes files 

2274 # Return empty GitAttributes 

2275 return GitAttributes([]) 

2276 

2277 def do_commit( 

2278 self, 

2279 message: Optional[bytes] = None, 

2280 committer: Optional[bytes] = None, 

2281 author: Optional[bytes] = None, 

2282 commit_timestamp=None, 

2283 commit_timezone=None, 

2284 author_timestamp=None, 

2285 author_timezone=None, 

2286 tree: Optional[ObjectID] = None, 

2287 encoding: Optional[bytes] = None, 

2288 ref: Optional[Ref] = b"HEAD", 

2289 merge_heads: Optional[list[ObjectID]] = None, 

2290 no_verify: bool = False, 

2291 sign: bool = False, 

2292 ): 

2293 """Create a new commit. 

2294 

2295 This is a simplified implementation for in-memory repositories that 

2296 doesn't support worktree operations or hooks. 

2297 

2298 Args: 

2299 message: Commit message 

2300 committer: Committer fullname 

2301 author: Author fullname 

2302 commit_timestamp: Commit timestamp (defaults to now) 

2303 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2304 author_timestamp: Author timestamp (defaults to commit timestamp) 

2305 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2306 tree: SHA1 of the tree root to use 

2307 encoding: Encoding 

2308 ref: Optional ref to commit to (defaults to current branch). 

2309 If None, creates a dangling commit without updating any ref. 

2310 merge_heads: Merge heads 

2311 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2312 sign: GPG Sign the commit (ignored for MemoryRepo) 

2313 

2314 Returns: 

2315 New commit SHA1 

2316 """ 

2317 import time 

2318 

2319 from .objects import Commit 

2320 

2321 if tree is None: 

2322 raise ValueError("tree must be specified for MemoryRepo") 

2323 

2324 c = Commit() 

2325 if len(tree) != 40: 

2326 raise ValueError("tree must be a 40-byte hex sha string") 

2327 c.tree = tree 

2328 

2329 config = self.get_config_stack() 

2330 if merge_heads is None: 

2331 merge_heads = [] 

2332 if committer is None: 

2333 committer = get_user_identity(config, kind="COMMITTER") 

2334 check_user_identity(committer) 

2335 c.committer = committer 

2336 if commit_timestamp is None: 

2337 commit_timestamp = time.time() 

2338 c.commit_time = int(commit_timestamp) 

2339 if commit_timezone is None: 

2340 commit_timezone = 0 

2341 c.commit_timezone = commit_timezone 

2342 if author is None: 

2343 author = get_user_identity(config, kind="AUTHOR") 

2344 c.author = author 

2345 check_user_identity(author) 

2346 if author_timestamp is None: 

2347 author_timestamp = commit_timestamp 

2348 c.author_time = int(author_timestamp) 

2349 if author_timezone is None: 

2350 author_timezone = commit_timezone 

2351 c.author_timezone = author_timezone 

2352 if encoding is None: 

2353 try: 

2354 encoding = config.get(("i18n",), "commitEncoding") 

2355 except KeyError: 

2356 pass 

2357 if encoding is not None: 

2358 c.encoding = encoding 

2359 

2360 # Handle message (for MemoryRepo, we don't support callable messages) 

2361 if callable(message): 

2362 message = message(self, c) 

2363 if message is None: 

2364 raise ValueError("Message callback returned None") 

2365 

2366 if message is None: 

2367 raise ValueError("No commit message specified") 

2368 

2369 c.message = message 

2370 

2371 if ref is None: 

2372 # Create a dangling commit 

2373 c.parents = merge_heads 

2374 self.object_store.add_object(c) 

2375 else: 

2376 try: 

2377 old_head = self.refs[ref] 

2378 c.parents = [old_head, *merge_heads] 

2379 self.object_store.add_object(c) 

2380 ok = self.refs.set_if_equals( 

2381 ref, 

2382 old_head, 

2383 c.id, 

2384 message=b"commit: " + message, 

2385 committer=committer, 

2386 timestamp=commit_timestamp, 

2387 timezone=commit_timezone, 

2388 ) 

2389 except KeyError: 

2390 c.parents = merge_heads 

2391 self.object_store.add_object(c) 

2392 ok = self.refs.add_if_new( 

2393 ref, 

2394 c.id, 

2395 message=b"commit: " + message, 

2396 committer=committer, 

2397 timestamp=commit_timestamp, 

2398 timezone=commit_timezone, 

2399 ) 

2400 if not ok: 

2401 from .errors import CommitError 

2402 

2403 raise CommitError(f"{ref!r} changed during commit") 

2404 

2405 return c.id 

2406 

2407 @classmethod 

2408 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2409 """Create a new bare repository in memory. 

2410 

2411 Args: 

2412 objects: Objects for the new repository, 

2413 as iterable 

2414 refs: Refs as dictionary, mapping names 

2415 to object SHA1s 

2416 format: Repository format version (defaults to 0) 

2417 """ 

2418 ret = cls() 

2419 for obj in objects: 

2420 ret.object_store.add_object(obj) 

2421 for refname, sha in refs.items(): 

2422 ret.refs.add_if_new(refname, sha) 

2423 ret._init_files(bare=True, format=format) 

2424 return ret