Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

970 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable, Iterator 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 TypeVar, 

46 Union, 

47) 

48 

49if TYPE_CHECKING: 

50 # There are no circular imports here, but we try to defer imports as long 

51 # as possible to reduce start-up time for anything that doesn't need 

52 # these imports. 

53 from .attrs import GitAttributes 

54 from .config import ConditionMatcher, ConfigFile, StackedConfig 

55 from .index import Index 

56 from .line_ending import BlobNormalizer 

57 from .notes import Notes 

58 from .object_store import BaseObjectStore, GraphWalker, UnpackedObject 

59 from .rebase import RebaseStateManager 

60 from .walk import Walker 

61 from .worktree import WorkTree 

62 

63from . import replace_me 

64from .errors import ( 

65 NoIndexPresent, 

66 NotBlobError, 

67 NotCommitError, 

68 NotGitRepository, 

69 NotTagError, 

70 NotTreeError, 

71 RefFormatError, 

72) 

73from .file import GitFile 

74from .hooks import ( 

75 CommitMsgShellHook, 

76 Hook, 

77 PostCommitShellHook, 

78 PostReceiveShellHook, 

79 PreCommitShellHook, 

80) 

81from .object_store import ( 

82 DiskObjectStore, 

83 MemoryObjectStore, 

84 MissingObjectFinder, 

85 ObjectStoreGraphWalker, 

86 PackBasedObjectStore, 

87 find_shallow, 

88 peel_sha, 

89) 

90from .objects import ( 

91 Blob, 

92 Commit, 

93 ObjectID, 

94 ShaFile, 

95 Tag, 

96 Tree, 

97 check_hexsha, 

98 valid_hexsha, 

99) 

100from .pack import generate_unpacked_objects 

101from .refs import ( 

102 ANNOTATED_TAG_SUFFIX, # noqa: F401 

103 LOCAL_BRANCH_PREFIX, 

104 LOCAL_TAG_PREFIX, # noqa: F401 

105 SYMREF, # noqa: F401 

106 DictRefsContainer, 

107 DiskRefsContainer, 

108 InfoRefsContainer, # noqa: F401 

109 Ref, 

110 RefsContainer, 

111 _set_default_branch, 

112 _set_head, 

113 _set_origin_head, 

114 check_ref_format, # noqa: F401 

115 read_packed_refs, # noqa: F401 

116 read_packed_refs_with_peeled, # noqa: F401 

117 serialize_refs, 

118 write_packed_refs, # noqa: F401 

119) 

120 

121CONTROLDIR = ".git" 

122OBJECTDIR = "objects" 

123 

124T = TypeVar("T", bound="ShaFile") 

125REFSDIR = "refs" 

126REFSDIR_TAGS = "tags" 

127REFSDIR_HEADS = "heads" 

128INDEX_FILENAME = "index" 

129COMMONDIR = "commondir" 

130GITDIR = "gitdir" 

131WORKTREES = "worktrees" 

132 

133BASE_DIRECTORIES = [ 

134 ["branches"], 

135 [REFSDIR], 

136 [REFSDIR, REFSDIR_TAGS], 

137 [REFSDIR, REFSDIR_HEADS], 

138 ["hooks"], 

139 ["info"], 

140] 

141 

142DEFAULT_BRANCH = b"master" 

143 

144 

145class InvalidUserIdentity(Exception): 

146 """User identity is not of the format 'user <email>'.""" 

147 

148 def __init__(self, identity: str) -> None: 

149 """Initialize InvalidUserIdentity exception.""" 

150 self.identity = identity 

151 

152 

153class DefaultIdentityNotFound(Exception): 

154 """Default identity could not be determined.""" 

155 

156 

157# TODO(jelmer): Cache? 

158def _get_default_identity() -> tuple[str, str]: 

159 import socket 

160 

161 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

162 username = os.environ.get(name) 

163 if username: 

164 break 

165 else: 

166 username = None 

167 

168 try: 

169 import pwd 

170 except ImportError: 

171 fullname = None 

172 else: 

173 try: 

174 entry = pwd.getpwuid(os.getuid()) # type: ignore 

175 except KeyError: 

176 fullname = None 

177 else: 

178 if getattr(entry, "gecos", None): 

179 fullname = entry.pw_gecos.split(",")[0] 

180 else: 

181 fullname = None 

182 if username is None: 

183 username = entry.pw_name 

184 if not fullname: 

185 if username is None: 

186 raise DefaultIdentityNotFound("no username found") 

187 fullname = username 

188 email = os.environ.get("EMAIL") 

189 if email is None: 

190 if username is None: 

191 raise DefaultIdentityNotFound("no username found") 

192 email = f"{username}@{socket.gethostname()}" 

193 return (fullname, email) 

194 

195 

196def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

197 """Determine the identity to use for new commits. 

198 

199 If kind is set, this first checks 

200 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

201 

202 If those variables are not set, then it will fall back 

203 to reading the user.name and user.email settings from 

204 the specified configuration. 

205 

206 If that also fails, then it will fall back to using 

207 the current users' identity as obtained from the host 

208 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

209 

210 Args: 

211 config: Configuration stack to read from 

212 kind: Optional kind to return identity for, 

213 usually either "AUTHOR" or "COMMITTER". 

214 

215 Returns: 

216 A user identity 

217 """ 

218 user: Optional[bytes] = None 

219 email: Optional[bytes] = None 

220 if kind: 

221 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

222 if user_uc is not None: 

223 user = user_uc.encode("utf-8") 

224 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

225 if email_uc is not None: 

226 email = email_uc.encode("utf-8") 

227 if user is None: 

228 try: 

229 user = config.get(("user",), "name") 

230 except KeyError: 

231 user = None 

232 if email is None: 

233 try: 

234 email = config.get(("user",), "email") 

235 except KeyError: 

236 email = None 

237 default_user, default_email = _get_default_identity() 

238 if user is None: 

239 user = default_user.encode("utf-8") 

240 if email is None: 

241 email = default_email.encode("utf-8") 

242 if email.startswith(b"<") and email.endswith(b">"): 

243 email = email[1:-1] 

244 return user + b" <" + email + b">" 

245 

246 

247def check_user_identity(identity: bytes) -> None: 

248 """Verify that a user identity is formatted correctly. 

249 

250 Args: 

251 identity: User identity bytestring 

252 Raises: 

253 InvalidUserIdentity: Raised when identity is invalid 

254 """ 

255 try: 

256 fst, snd = identity.split(b" <", 1) 

257 except ValueError as exc: 

258 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

259 if b">" not in snd: 

260 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

261 if b"\0" in identity or b"\n" in identity: 

262 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

263 

264 

265def parse_graftpoints( 

266 graftpoints: Iterable[bytes], 

267) -> dict[bytes, list[bytes]]: 

268 """Convert a list of graftpoints into a dict. 

269 

270 Args: 

271 graftpoints: Iterator of graftpoint lines 

272 

273 Each line is formatted as: 

274 <commit sha1> <parent sha1> [<parent sha1>]* 

275 

276 Resulting dictionary is: 

277 <commit sha1>: [<parent sha1>*] 

278 

279 https://git.wiki.kernel.org/index.php/GraftPoint 

280 """ 

281 grafts = {} 

282 for line in graftpoints: 

283 raw_graft = line.split(None, 1) 

284 

285 commit = raw_graft[0] 

286 if len(raw_graft) == 2: 

287 parents = raw_graft[1].split() 

288 else: 

289 parents = [] 

290 

291 for sha in [commit, *parents]: 

292 check_hexsha(sha, "Invalid graftpoint") 

293 

294 grafts[commit] = parents 

295 return grafts 

296 

297 

298def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

299 """Convert a dictionary of grafts into string. 

300 

301 The graft dictionary is: 

302 <commit sha1>: [<parent sha1>*] 

303 

304 Each line is formatted as: 

305 <commit sha1> <parent sha1> [<parent sha1>]* 

306 

307 https://git.wiki.kernel.org/index.php/GraftPoint 

308 

309 """ 

310 graft_lines = [] 

311 for commit, parents in graftpoints.items(): 

312 if parents: 

313 graft_lines.append(commit + b" " + b" ".join(parents)) 

314 else: 

315 graft_lines.append(commit) 

316 return b"\n".join(graft_lines) 

317 

318 

319def _set_filesystem_hidden(path: str) -> None: 

320 """Mark path as to be hidden if supported by platform and filesystem. 

321 

322 On win32 uses SetFileAttributesW api: 

323 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

324 """ 

325 if sys.platform == "win32": 

326 import ctypes 

327 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

328 

329 FILE_ATTRIBUTE_HIDDEN = 2 

330 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

331 ("SetFileAttributesW", ctypes.windll.kernel32) 

332 ) 

333 

334 if isinstance(path, bytes): 

335 path = os.fsdecode(path) 

336 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

337 pass # Could raise or log `ctypes.WinError()` here 

338 

339 # Could implement other platform specific filesystem hiding here 

340 

341 

342class ParentsProvider: 

343 """Provider for commit parent information.""" 

344 

345 def __init__( 

346 self, 

347 store: "BaseObjectStore", 

348 grafts: dict = {}, 

349 shallows: Iterable[bytes] = [], 

350 ) -> None: 

351 """Initialize ParentsProvider. 

352 

353 Args: 

354 store: Object store to use 

355 grafts: Graft information 

356 shallows: Shallow commit SHAs 

357 """ 

358 self.store = store 

359 self.grafts = grafts 

360 self.shallows = set(shallows) 

361 

362 # Get commit graph once at initialization for performance 

363 self.commit_graph = store.get_commit_graph() 

364 

365 def get_parents( 

366 self, commit_id: bytes, commit: Optional[Commit] = None 

367 ) -> list[bytes]: 

368 """Get parents for a commit using the parents provider.""" 

369 try: 

370 return self.grafts[commit_id] 

371 except KeyError: 

372 pass 

373 if commit_id in self.shallows: 

374 return [] 

375 

376 # Try to use commit graph for faster parent lookup 

377 if self.commit_graph: 

378 parents = self.commit_graph.get_parents(commit_id) 

379 if parents is not None: 

380 return parents 

381 

382 # Fallback to reading the commit object 

383 if commit is None: 

384 obj = self.store[commit_id] 

385 assert isinstance(obj, Commit) 

386 commit = obj 

387 return commit.parents 

388 

389 

390class BaseRepo: 

391 """Base class for a git repository. 

392 

393 This base class is meant to be used for Repository implementations that e.g. 

394 work on top of a different transport than a standard filesystem path. 

395 

396 Attributes: 

397 object_store: Dictionary-like object for accessing 

398 the objects 

399 refs: Dictionary-like object with the refs in this 

400 repository 

401 """ 

402 

403 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

404 """Open a repository. 

405 

406 This shouldn't be called directly, but rather through one of the 

407 base classes, such as MemoryRepo or Repo. 

408 

409 Args: 

410 object_store: Object store to use 

411 refs: Refs container to use 

412 """ 

413 self.object_store = object_store 

414 self.refs = refs 

415 

416 self._graftpoints: dict[bytes, list[bytes]] = {} 

417 self.hooks: dict[str, Hook] = {} 

418 

419 def _determine_file_mode(self) -> bool: 

420 """Probe the file-system to determine whether permissions can be trusted. 

421 

422 Returns: True if permissions can be trusted, False otherwise. 

423 """ 

424 raise NotImplementedError(self._determine_file_mode) 

425 

426 def _determine_symlinks(self) -> bool: 

427 """Probe the filesystem to determine whether symlinks can be created. 

428 

429 Returns: True if symlinks can be created, False otherwise. 

430 """ 

431 # For now, just mimic the old behaviour 

432 return sys.platform != "win32" 

433 

434 def _init_files( 

435 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

436 ) -> None: 

437 """Initialize a default set of named files.""" 

438 from .config import ConfigFile 

439 

440 self._put_named_file("description", b"Unnamed repository") 

441 f = BytesIO() 

442 cf = ConfigFile() 

443 if format is None: 

444 format = 0 

445 if format not in (0, 1): 

446 raise ValueError(f"Unsupported repository format version: {format}") 

447 cf.set("core", "repositoryformatversion", str(format)) 

448 if self._determine_file_mode(): 

449 cf.set("core", "filemode", True) 

450 else: 

451 cf.set("core", "filemode", False) 

452 

453 if symlinks is None and not bare: 

454 symlinks = self._determine_symlinks() 

455 

456 if symlinks is False: 

457 cf.set("core", "symlinks", symlinks) 

458 

459 cf.set("core", "bare", bare) 

460 cf.set("core", "logallrefupdates", True) 

461 cf.write_to_file(f) 

462 self._put_named_file("config", f.getvalue()) 

463 self._put_named_file(os.path.join("info", "exclude"), b"") 

464 

465 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

466 """Get a file from the control dir with a specific name. 

467 

468 Although the filename should be interpreted as a filename relative to 

469 the control dir in a disk-based Repo, the object returned need not be 

470 pointing to a file in that location. 

471 

472 Args: 

473 path: The path to the file, relative to the control dir. 

474 Returns: An open file object, or None if the file does not exist. 

475 """ 

476 raise NotImplementedError(self.get_named_file) 

477 

478 def _put_named_file(self, path: str, contents: bytes) -> None: 

479 """Write a file to the control dir with the given name and contents. 

480 

481 Args: 

482 path: The path to the file, relative to the control dir. 

483 contents: A string to write to the file. 

484 """ 

485 raise NotImplementedError(self._put_named_file) 

486 

487 def _del_named_file(self, path: str) -> None: 

488 """Delete a file in the control directory with the given name.""" 

489 raise NotImplementedError(self._del_named_file) 

490 

491 def open_index(self) -> "Index": 

492 """Open the index for this repository. 

493 

494 Raises: 

495 NoIndexPresent: If no index is present 

496 Returns: The matching `Index` 

497 """ 

498 raise NotImplementedError(self.open_index) 

499 

500 def fetch( 

501 self, 

502 target: "BaseRepo", 

503 determine_wants: Optional[Callable] = None, 

504 progress: Optional[Callable] = None, 

505 depth: Optional[int] = None, 

506 ) -> dict: 

507 """Fetch objects into another repository. 

508 

509 Args: 

510 target: The target repository 

511 determine_wants: Optional function to determine what refs to 

512 fetch. 

513 progress: Optional progress function 

514 depth: Optional shallow fetch depth 

515 Returns: The local refs 

516 """ 

517 if determine_wants is None: 

518 determine_wants = target.object_store.determine_wants_all 

519 count, pack_data = self.fetch_pack_data( 

520 determine_wants, 

521 target.get_graph_walker(), 

522 progress=progress, 

523 depth=depth, 

524 ) 

525 target.object_store.add_pack_data(count, pack_data, progress) 

526 return self.get_refs() 

527 

528 def fetch_pack_data( 

529 self, 

530 determine_wants: Callable, 

531 graph_walker: "GraphWalker", 

532 progress: Optional[Callable], 

533 *, 

534 get_tagged: Optional[Callable] = None, 

535 depth: Optional[int] = None, 

536 ) -> tuple: 

537 """Fetch the pack data required for a set of revisions. 

538 

539 Args: 

540 determine_wants: Function that takes a dictionary with heads 

541 and returns the list of heads to fetch. 

542 graph_walker: Object that can iterate over the list of revisions 

543 to fetch and has an "ack" method that will be called to acknowledge 

544 that a revision is present. 

545 progress: Simple progress function that will be called with 

546 updated progress strings. 

547 get_tagged: Function that returns a dict of pointed-to sha -> 

548 tag sha for including tags. 

549 depth: Shallow fetch depth 

550 Returns: count and iterator over pack data 

551 """ 

552 missing_objects = self.find_missing_objects( 

553 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

554 ) 

555 if missing_objects is None: 

556 return 0, iter([]) 

557 remote_has = missing_objects.get_remote_has() 

558 object_ids = list(missing_objects) 

559 return len(object_ids), generate_unpacked_objects( 

560 self.object_store, object_ids, progress=progress, other_haves=remote_has 

561 ) 

562 

563 def find_missing_objects( 

564 self, 

565 determine_wants: Callable, 

566 graph_walker: "GraphWalker", 

567 progress: Optional[Callable], 

568 *, 

569 get_tagged: Optional[Callable] = None, 

570 depth: Optional[int] = None, 

571 ) -> Optional[MissingObjectFinder]: 

572 """Fetch the missing objects required for a set of revisions. 

573 

574 Args: 

575 determine_wants: Function that takes a dictionary with heads 

576 and returns the list of heads to fetch. 

577 graph_walker: Object that can iterate over the list of revisions 

578 to fetch and has an "ack" method that will be called to acknowledge 

579 that a revision is present. 

580 progress: Simple progress function that will be called with 

581 updated progress strings. 

582 get_tagged: Function that returns a dict of pointed-to sha -> 

583 tag sha for including tags. 

584 depth: Shallow fetch depth 

585 Returns: iterator over objects, with __len__ implemented 

586 """ 

587 refs = serialize_refs(self.object_store, self.get_refs()) 

588 

589 wants = determine_wants(refs) 

590 if not isinstance(wants, list): 

591 raise TypeError("determine_wants() did not return a list") 

592 

593 current_shallow = set(getattr(graph_walker, "shallow", set())) 

594 

595 if depth not in (None, 0): 

596 assert depth is not None 

597 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

598 # Only update if graph_walker has shallow attribute 

599 if hasattr(graph_walker, "shallow"): 

600 graph_walker.shallow.update(shallow - not_shallow) 

601 new_shallow = graph_walker.shallow - current_shallow 

602 unshallow = graph_walker.unshallow = not_shallow & current_shallow # type: ignore[attr-defined] 

603 if hasattr(graph_walker, "update_shallow"): 

604 graph_walker.update_shallow(new_shallow, unshallow) 

605 else: 

606 unshallow = getattr(graph_walker, "unshallow", set()) 

607 

608 if wants == []: 

609 # TODO(dborowitz): find a way to short-circuit that doesn't change 

610 # this interface. 

611 

612 if getattr(graph_walker, "shallow", set()) or unshallow: 

613 # Do not send a pack in shallow short-circuit path 

614 return None 

615 

616 class DummyMissingObjectFinder: 

617 """Dummy finder that returns no missing objects.""" 

618 

619 def get_remote_has(self) -> None: 

620 """Get remote has (always returns None). 

621 

622 Returns: 

623 None 

624 """ 

625 return None 

626 

627 def __len__(self) -> int: 

628 return 0 

629 

630 def __iter__(self) -> Iterator[tuple[bytes, Optional[bytes]]]: 

631 yield from [] 

632 

633 return DummyMissingObjectFinder() # type: ignore 

634 

635 # If the graph walker is set up with an implementation that can 

636 # ACK/NAK to the wire, it will write data to the client through 

637 # this call as a side-effect. 

638 haves = self.object_store.find_common_revisions(graph_walker) 

639 

640 # Deal with shallow requests separately because the haves do 

641 # not reflect what objects are missing 

642 if getattr(graph_walker, "shallow", set()) or unshallow: 

643 # TODO: filter the haves commits from iter_shas. the specific 

644 # commits aren't missing. 

645 haves = [] 

646 

647 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

648 

649 def get_parents(commit: Commit) -> list[bytes]: 

650 """Get parents for a commit using the parents provider. 

651 

652 Args: 

653 commit: Commit object 

654 

655 Returns: 

656 List of parent commit SHAs 

657 """ 

658 return parents_provider.get_parents(commit.id, commit) 

659 

660 return MissingObjectFinder( 

661 self.object_store, 

662 haves=haves, 

663 wants=wants, 

664 shallow=getattr(graph_walker, "shallow", set()), 

665 progress=progress, 

666 get_tagged=get_tagged, 

667 get_parents=get_parents, 

668 ) 

669 

670 def generate_pack_data( 

671 self, 

672 have: Iterable[ObjectID], 

673 want: Iterable[ObjectID], 

674 progress: Optional[Callable[[str], None]] = None, 

675 ofs_delta: Optional[bool] = None, 

676 ) -> tuple[int, Iterator["UnpackedObject"]]: 

677 """Generate pack data objects for a set of wants/haves. 

678 

679 Args: 

680 have: List of SHA1s of objects that should not be sent 

681 want: List of SHA1s of objects that should be sent 

682 ofs_delta: Whether OFS deltas can be included 

683 progress: Optional progress reporting method 

684 """ 

685 return self.object_store.generate_pack_data( 

686 have, 

687 want, 

688 shallow=self.get_shallow(), 

689 progress=progress, 

690 ofs_delta=ofs_delta, 

691 ) 

692 

693 def get_graph_walker( 

694 self, heads: Optional[list[ObjectID]] = None 

695 ) -> ObjectStoreGraphWalker: 

696 """Retrieve a graph walker. 

697 

698 A graph walker is used by a remote repository (or proxy) 

699 to find out which objects are present in this repository. 

700 

701 Args: 

702 heads: Repository heads to use (optional) 

703 Returns: A graph walker object 

704 """ 

705 if heads is None: 

706 heads = [ 

707 sha 

708 for sha in self.refs.as_dict(b"refs/heads").values() 

709 if sha in self.object_store 

710 ] 

711 parents_provider = ParentsProvider(self.object_store) 

712 return ObjectStoreGraphWalker( 

713 heads, 

714 parents_provider.get_parents, 

715 shallow=self.get_shallow(), 

716 update_shallow=self.update_shallow, 

717 ) 

718 

719 def get_refs(self) -> dict[bytes, bytes]: 

720 """Get dictionary with all refs. 

721 

722 Returns: A ``dict`` mapping ref names to SHA1s 

723 """ 

724 return self.refs.as_dict() 

725 

726 def head(self) -> bytes: 

727 """Return the SHA1 pointed at by HEAD.""" 

728 # TODO: move this method to WorkTree 

729 return self.refs[b"HEAD"] 

730 

731 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

732 assert len(sha) in (20, 40) 

733 ret = self.get_object(sha) 

734 if not isinstance(ret, cls): 

735 if cls is Commit: 

736 raise NotCommitError(ret.id) 

737 elif cls is Blob: 

738 raise NotBlobError(ret.id) 

739 elif cls is Tree: 

740 raise NotTreeError(ret.id) 

741 elif cls is Tag: 

742 raise NotTagError(ret.id) 

743 else: 

744 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

745 return ret 

746 

747 def get_object(self, sha: bytes) -> ShaFile: 

748 """Retrieve the object with the specified SHA. 

749 

750 Args: 

751 sha: SHA to retrieve 

752 Returns: A ShaFile object 

753 Raises: 

754 KeyError: when the object can not be found 

755 """ 

756 return self.object_store[sha] 

757 

758 def parents_provider(self) -> ParentsProvider: 

759 """Get a parents provider for this repository. 

760 

761 Returns: 

762 ParentsProvider instance configured with grafts and shallows 

763 """ 

764 return ParentsProvider( 

765 self.object_store, 

766 grafts=self._graftpoints, 

767 shallows=self.get_shallow(), 

768 ) 

769 

770 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

771 """Retrieve the parents of a specific commit. 

772 

773 If the specific commit is a graftpoint, the graft parents 

774 will be returned instead. 

775 

776 Args: 

777 sha: SHA of the commit for which to retrieve the parents 

778 commit: Optional commit matching the sha 

779 Returns: List of parents 

780 """ 

781 return self.parents_provider().get_parents(sha, commit) 

782 

783 def get_config(self) -> "ConfigFile": 

784 """Retrieve the config object. 

785 

786 Returns: `ConfigFile` object for the ``.git/config`` file. 

787 """ 

788 raise NotImplementedError(self.get_config) 

789 

790 def get_worktree_config(self) -> "ConfigFile": 

791 """Retrieve the worktree config object.""" 

792 raise NotImplementedError(self.get_worktree_config) 

793 

794 def get_description(self) -> Optional[str]: 

795 """Retrieve the description for this repository. 

796 

797 Returns: String with the description of the repository 

798 as set by the user. 

799 """ 

800 raise NotImplementedError(self.get_description) 

801 

802 def set_description(self, description: bytes) -> None: 

803 """Set the description for this repository. 

804 

805 Args: 

806 description: Text to set as description for this repository. 

807 """ 

808 raise NotImplementedError(self.set_description) 

809 

810 def get_rebase_state_manager(self) -> "RebaseStateManager": 

811 """Get the appropriate rebase state manager for this repository. 

812 

813 Returns: RebaseStateManager instance 

814 """ 

815 raise NotImplementedError(self.get_rebase_state_manager) 

816 

817 def get_blob_normalizer(self) -> "BlobNormalizer": 

818 """Return a BlobNormalizer object for checkin/checkout operations. 

819 

820 Returns: BlobNormalizer instance 

821 """ 

822 raise NotImplementedError(self.get_blob_normalizer) 

823 

824 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

825 """Read gitattributes for the repository. 

826 

827 Args: 

828 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

829 

830 Returns: 

831 GitAttributes object that can be used to match paths 

832 """ 

833 raise NotImplementedError(self.get_gitattributes) 

834 

835 def get_config_stack(self) -> "StackedConfig": 

836 """Return a config stack for this repository. 

837 

838 This stack accesses the configuration for both this repository 

839 itself (.git/config) and the global configuration, which usually 

840 lives in ~/.gitconfig. 

841 

842 Returns: `Config` instance for this repository 

843 """ 

844 from .config import ConfigFile, StackedConfig 

845 

846 local_config = self.get_config() 

847 backends: list[ConfigFile] = [local_config] 

848 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

849 backends.append(self.get_worktree_config()) 

850 

851 backends += StackedConfig.default_backends() 

852 return StackedConfig(backends, writable=local_config) 

853 

854 def get_shallow(self) -> set[ObjectID]: 

855 """Get the set of shallow commits. 

856 

857 Returns: Set of shallow commits. 

858 """ 

859 f = self.get_named_file("shallow") 

860 if f is None: 

861 return set() 

862 with f: 

863 return {line.strip() for line in f} 

864 

865 def update_shallow( 

866 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]] 

867 ) -> None: 

868 """Update the list of shallow objects. 

869 

870 Args: 

871 new_shallow: Newly shallow objects 

872 new_unshallow: Newly no longer shallow objects 

873 """ 

874 shallow = self.get_shallow() 

875 if new_shallow: 

876 shallow.update(new_shallow) 

877 if new_unshallow: 

878 shallow.difference_update(new_unshallow) 

879 if shallow: 

880 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

881 else: 

882 self._del_named_file("shallow") 

883 

884 def get_peeled(self, ref: Ref) -> ObjectID: 

885 """Get the peeled value of a ref. 

886 

887 Args: 

888 ref: The refname to peel. 

889 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

890 intermediate tags; if the original ref does not point to a tag, 

891 this will equal the original SHA1. 

892 """ 

893 cached = self.refs.get_peeled(ref) 

894 if cached is not None: 

895 return cached 

896 return peel_sha(self.object_store, self.refs[ref])[1].id 

897 

898 @property 

899 def notes(self) -> "Notes": 

900 """Access notes functionality for this repository. 

901 

902 Returns: 

903 Notes object for accessing notes 

904 """ 

905 from .notes import Notes 

906 

907 return Notes(self.object_store, self.refs) 

908 

909 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs) -> "Walker": 

910 """Obtain a walker for this repository. 

911 

912 Args: 

913 include: Iterable of SHAs of commits to include along with their 

914 ancestors. Defaults to [HEAD] 

915 **kwargs: Additional keyword arguments including: 

916 

917 * exclude: Iterable of SHAs of commits to exclude along with their 

918 ancestors, overriding includes. 

919 * order: ORDER_* constant specifying the order of results. 

920 Anything other than ORDER_DATE may result in O(n) memory usage. 

921 * reverse: If True, reverse the order of output, requiring O(n) 

922 memory. 

923 * max_entries: The maximum number of entries to yield, or None for 

924 no limit. 

925 * paths: Iterable of file or subtree paths to show entries for. 

926 * rename_detector: diff.RenameDetector object for detecting 

927 renames. 

928 * follow: If True, follow path across renames/copies. Forces a 

929 default rename_detector. 

930 * since: Timestamp to list commits after. 

931 * until: Timestamp to list commits before. 

932 * queue_cls: A class to use for a queue of commits, supporting the 

933 iterator protocol. The constructor takes a single argument, the Walker. 

934 

935 Returns: A `Walker` object 

936 """ 

937 from .walk import Walker 

938 

939 if include is None: 

940 include = [self.head()] 

941 

942 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

943 

944 return Walker(self.object_store, include, **kwargs) 

945 

946 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile": 

947 """Retrieve a Git object by SHA1 or ref. 

948 

949 Args: 

950 name: A Git object SHA1 or a ref name 

951 Returns: A `ShaFile` object, such as a Commit or Blob 

952 Raises: 

953 KeyError: when the specified ref or object does not exist 

954 """ 

955 if not isinstance(name, bytes): 

956 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

957 if len(name) in (20, 40): 

958 try: 

959 return self.object_store[name] 

960 except (KeyError, ValueError): 

961 pass 

962 try: 

963 return self.object_store[self.refs[name]] 

964 except RefFormatError as exc: 

965 raise KeyError(name) from exc 

966 

967 def __contains__(self, name: bytes) -> bool: 

968 """Check if a specific Git object or ref is present. 

969 

970 Args: 

971 name: Git object SHA1 or ref name 

972 """ 

973 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

974 return name in self.object_store or name in self.refs 

975 else: 

976 return name in self.refs 

977 

978 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

979 """Set a ref. 

980 

981 Args: 

982 name: ref name 

983 value: Ref value - either a ShaFile object, or a hex sha 

984 """ 

985 if name.startswith(b"refs/") or name == b"HEAD": 

986 if isinstance(value, ShaFile): 

987 self.refs[name] = value.id 

988 elif isinstance(value, bytes): 

989 self.refs[name] = value 

990 else: 

991 raise TypeError(value) 

992 else: 

993 raise ValueError(name) 

994 

995 def __delitem__(self, name: bytes) -> None: 

996 """Remove a ref. 

997 

998 Args: 

999 name: Name of the ref to remove 

1000 """ 

1001 if name.startswith(b"refs/") or name == b"HEAD": 

1002 del self.refs[name] 

1003 else: 

1004 raise ValueError(name) 

1005 

1006 def _get_user_identity( 

1007 self, config: "StackedConfig", kind: Optional[str] = None 

1008 ) -> bytes: 

1009 """Determine the identity to use for new commits.""" 

1010 warnings.warn( 

1011 "use get_user_identity() rather than Repo._get_user_identity", 

1012 DeprecationWarning, 

1013 ) 

1014 return get_user_identity(config) 

1015 

1016 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1017 """Add or modify graftpoints. 

1018 

1019 Args: 

1020 updated_graftpoints: Dict of commit shas to list of parent shas 

1021 """ 

1022 # Simple validation 

1023 for commit, parents in updated_graftpoints.items(): 

1024 for sha in [commit, *parents]: 

1025 check_hexsha(sha, "Invalid graftpoint") 

1026 

1027 self._graftpoints.update(updated_graftpoints) 

1028 

1029 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

1030 """Remove graftpoints. 

1031 

1032 Args: 

1033 to_remove: List of commit shas 

1034 """ 

1035 for sha in to_remove: 

1036 del self._graftpoints[sha] 

1037 

1038 def _read_heads(self, name: str) -> list[bytes]: 

1039 f = self.get_named_file(name) 

1040 if f is None: 

1041 return [] 

1042 with f: 

1043 return [line.strip() for line in f.readlines() if line.strip()] 

1044 

1045 def get_worktree(self) -> "WorkTree": 

1046 """Get the working tree for this repository. 

1047 

1048 Returns: 

1049 WorkTree instance for performing working tree operations 

1050 

1051 Raises: 

1052 NotImplementedError: If the repository doesn't support working trees 

1053 """ 

1054 raise NotImplementedError( 

1055 "Working tree operations not supported by this repository type" 

1056 ) 

1057 

1058 @replace_me(remove_in="0.26.0") 

1059 def do_commit( 

1060 self, 

1061 message: Optional[bytes] = None, 

1062 committer: Optional[bytes] = None, 

1063 author: Optional[bytes] = None, 

1064 commit_timestamp: Optional[float] = None, 

1065 commit_timezone: Optional[int] = None, 

1066 author_timestamp: Optional[float] = None, 

1067 author_timezone: Optional[int] = None, 

1068 tree: Optional[ObjectID] = None, 

1069 encoding: Optional[bytes] = None, 

1070 ref: Optional[Ref] = b"HEAD", 

1071 merge_heads: Optional[list[ObjectID]] = None, 

1072 no_verify: bool = False, 

1073 sign: bool = False, 

1074 ) -> bytes: 

1075 """Create a new commit. 

1076 

1077 If not specified, committer and author default to 

1078 get_user_identity(..., 'COMMITTER') 

1079 and get_user_identity(..., 'AUTHOR') respectively. 

1080 

1081 Args: 

1082 message: Commit message (bytes or callable that takes (repo, commit) 

1083 and returns bytes) 

1084 committer: Committer fullname 

1085 author: Author fullname 

1086 commit_timestamp: Commit timestamp (defaults to now) 

1087 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1088 author_timestamp: Author timestamp (defaults to commit 

1089 timestamp) 

1090 author_timezone: Author timestamp timezone 

1091 (defaults to commit timestamp timezone) 

1092 tree: SHA1 of the tree root to use (if not specified the 

1093 current index will be committed). 

1094 encoding: Encoding 

1095 ref: Optional ref to commit to (defaults to current branch). 

1096 If None, creates a dangling commit without updating any ref. 

1097 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1098 no_verify: Skip pre-commit and commit-msg hooks 

1099 sign: GPG Sign the commit (bool, defaults to False, 

1100 pass True to use default GPG key, 

1101 pass a str containing Key ID to use a specific GPG key) 

1102 

1103 Returns: 

1104 New commit SHA1 

1105 """ 

1106 return self.get_worktree().commit( 

1107 message=message, 

1108 committer=committer, 

1109 author=author, 

1110 commit_timestamp=commit_timestamp, 

1111 commit_timezone=commit_timezone, 

1112 author_timestamp=author_timestamp, 

1113 author_timezone=author_timezone, 

1114 tree=tree, 

1115 encoding=encoding, 

1116 ref=ref, 

1117 merge_heads=merge_heads, 

1118 no_verify=no_verify, 

1119 sign=sign, 

1120 ) 

1121 

1122 

1123def read_gitfile(f: BinaryIO) -> str: 

1124 """Read a ``.git`` file. 

1125 

1126 The first line of the file should start with "gitdir: " 

1127 

1128 Args: 

1129 f: File-like object to read from 

1130 Returns: A path 

1131 """ 

1132 cs = f.read() 

1133 if not cs.startswith(b"gitdir: "): 

1134 raise ValueError("Expected file to start with 'gitdir: '") 

1135 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8") 

1136 

1137 

1138class UnsupportedVersion(Exception): 

1139 """Unsupported repository version.""" 

1140 

1141 def __init__(self, version) -> None: 

1142 """Initialize UnsupportedVersion exception. 

1143 

1144 Args: 

1145 version: The unsupported repository version 

1146 """ 

1147 self.version = version 

1148 

1149 

1150class UnsupportedExtension(Exception): 

1151 """Unsupported repository extension.""" 

1152 

1153 def __init__(self, extension) -> None: 

1154 """Initialize UnsupportedExtension exception. 

1155 

1156 Args: 

1157 extension: The unsupported repository extension 

1158 """ 

1159 self.extension = extension 

1160 

1161 

1162class Repo(BaseRepo): 

1163 """A git repository backed by local disk. 

1164 

1165 To open an existing repository, call the constructor with 

1166 the path of the repository. 

1167 

1168 To create a new repository, use the Repo.init class method. 

1169 

1170 Note that a repository object may hold on to resources such 

1171 as file handles for performance reasons; call .close() to free 

1172 up those resources. 

1173 

1174 Attributes: 

1175 path: Path to the working copy (if it exists) or repository control 

1176 directory (if the repository is bare) 

1177 bare: Whether this is a bare repository 

1178 """ 

1179 

1180 path: str 

1181 bare: bool 

1182 object_store: DiskObjectStore 

1183 

1184 def __init__( 

1185 self, 

1186 root: Union[str, bytes, os.PathLike], 

1187 object_store: Optional[PackBasedObjectStore] = None, 

1188 bare: Optional[bool] = None, 

1189 ) -> None: 

1190 """Open a repository on disk. 

1191 

1192 Args: 

1193 root: Path to the repository's root. 

1194 object_store: ObjectStore to use; if omitted, we use the 

1195 repository's default object store 

1196 bare: True if this is a bare repository. 

1197 """ 

1198 root = os.fspath(root) 

1199 if isinstance(root, bytes): 

1200 root = os.fsdecode(root) 

1201 hidden_path = os.path.join(root, CONTROLDIR) 

1202 if bare is None: 

1203 if os.path.isfile(hidden_path) or os.path.isdir( 

1204 os.path.join(hidden_path, OBJECTDIR) 

1205 ): 

1206 bare = False 

1207 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1208 os.path.join(root, REFSDIR) 

1209 ): 

1210 bare = True 

1211 else: 

1212 raise NotGitRepository( 

1213 "No git repository was found at {path}".format(**dict(path=root)) 

1214 ) 

1215 

1216 self.bare = bare 

1217 if bare is False: 

1218 if os.path.isfile(hidden_path): 

1219 with open(hidden_path, "rb") as f: 

1220 path = read_gitfile(f) 

1221 self._controldir = os.path.join(root, path) 

1222 else: 

1223 self._controldir = hidden_path 

1224 else: 

1225 self._controldir = root 

1226 commondir = self.get_named_file(COMMONDIR) 

1227 if commondir is not None: 

1228 with commondir: 

1229 self._commondir = os.path.join( 

1230 self.controldir(), 

1231 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1232 ) 

1233 else: 

1234 self._commondir = self._controldir 

1235 self.path = root 

1236 

1237 # Initialize refs early so they're available for config condition matchers 

1238 self.refs = DiskRefsContainer( 

1239 self.commondir(), self._controldir, logger=self._write_reflog 

1240 ) 

1241 

1242 # Initialize worktrees container 

1243 from .worktree import WorkTreeContainer 

1244 

1245 self.worktrees = WorkTreeContainer(self) 

1246 

1247 config = self.get_config() 

1248 try: 

1249 repository_format_version = config.get("core", "repositoryformatversion") 

1250 format_version = ( 

1251 0 

1252 if repository_format_version is None 

1253 else int(repository_format_version) 

1254 ) 

1255 except KeyError: 

1256 format_version = 0 

1257 

1258 if format_version not in (0, 1): 

1259 raise UnsupportedVersion(format_version) 

1260 

1261 # Track extensions we encounter 

1262 has_reftable_extension = False 

1263 for extension, value in config.items((b"extensions",)): 

1264 if extension.lower() == b"refstorage": 

1265 if value == b"reftable": 

1266 has_reftable_extension = True 

1267 else: 

1268 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1269 elif extension.lower() not in (b"worktreeconfig",): 

1270 raise UnsupportedExtension(extension) 

1271 

1272 if object_store is None: 

1273 object_store = DiskObjectStore.from_config( 

1274 os.path.join(self.commondir(), OBJECTDIR), config 

1275 ) 

1276 

1277 # Use reftable if extension is configured 

1278 if has_reftable_extension: 

1279 from .reftable import ReftableRefsContainer 

1280 

1281 self.refs = ReftableRefsContainer(self.commondir()) 

1282 # Update worktrees container after refs change 

1283 self.worktrees = WorkTreeContainer(self) 

1284 BaseRepo.__init__(self, object_store, self.refs) 

1285 

1286 self._graftpoints = {} 

1287 graft_file = self.get_named_file( 

1288 os.path.join("info", "grafts"), basedir=self.commondir() 

1289 ) 

1290 if graft_file: 

1291 with graft_file: 

1292 self._graftpoints.update(parse_graftpoints(graft_file)) 

1293 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1294 if graft_file: 

1295 with graft_file: 

1296 self._graftpoints.update(parse_graftpoints(graft_file)) 

1297 

1298 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1299 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1300 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1301 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1302 

1303 def get_worktree(self) -> "WorkTree": 

1304 """Get the working tree for this repository. 

1305 

1306 Returns: 

1307 WorkTree instance for performing working tree operations 

1308 """ 

1309 from .worktree import WorkTree 

1310 

1311 return WorkTree(self, self.path) 

1312 

1313 def _write_reflog( 

1314 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1315 ) -> None: 

1316 from .reflog import format_reflog_line 

1317 

1318 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1319 try: 

1320 os.makedirs(os.path.dirname(path)) 

1321 except FileExistsError: 

1322 pass 

1323 if committer is None: 

1324 config = self.get_config_stack() 

1325 committer = get_user_identity(config) 

1326 check_user_identity(committer) 

1327 if timestamp is None: 

1328 timestamp = int(time.time()) 

1329 if timezone is None: 

1330 timezone = 0 # FIXME 

1331 with open(path, "ab") as f: 

1332 f.write( 

1333 format_reflog_line( 

1334 old_sha, new_sha, committer, timestamp, timezone, message 

1335 ) 

1336 + b"\n" 

1337 ) 

1338 

1339 def read_reflog(self, ref): 

1340 """Read reflog entries for a reference. 

1341 

1342 Args: 

1343 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1344 

1345 Yields: 

1346 reflog.Entry objects in chronological order (oldest first) 

1347 """ 

1348 from .reflog import read_reflog 

1349 

1350 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1351 try: 

1352 with open(path, "rb") as f: 

1353 yield from read_reflog(f) 

1354 except FileNotFoundError: 

1355 return 

1356 

1357 @classmethod 

1358 def discover(cls, start="."): 

1359 """Iterate parent directories to discover a repository. 

1360 

1361 Return a Repo object for the first parent directory that looks like a 

1362 Git repository. 

1363 

1364 Args: 

1365 start: The directory to start discovery from (defaults to '.') 

1366 """ 

1367 remaining = True 

1368 path = os.path.abspath(start) 

1369 while remaining: 

1370 try: 

1371 return cls(path) 

1372 except NotGitRepository: 

1373 path, remaining = os.path.split(path) 

1374 raise NotGitRepository( 

1375 "No git repository was found at {path}".format(**dict(path=start)) 

1376 ) 

1377 

1378 def controldir(self) -> str: 

1379 """Return the path of the control directory.""" 

1380 return self._controldir 

1381 

1382 def commondir(self) -> str: 

1383 """Return the path of the common directory. 

1384 

1385 For a main working tree, it is identical to controldir(). 

1386 

1387 For a linked working tree, it is the control directory of the 

1388 main working tree. 

1389 """ 

1390 return self._commondir 

1391 

1392 def _determine_file_mode(self) -> bool: 

1393 """Probe the file-system to determine whether permissions can be trusted. 

1394 

1395 Returns: True if permissions can be trusted, False otherwise. 

1396 """ 

1397 fname = os.path.join(self.path, ".probe-permissions") 

1398 with open(fname, "w") as f: 

1399 f.write("") 

1400 

1401 st1 = os.lstat(fname) 

1402 try: 

1403 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1404 except PermissionError: 

1405 return False 

1406 st2 = os.lstat(fname) 

1407 

1408 os.unlink(fname) 

1409 

1410 mode_differs = st1.st_mode != st2.st_mode 

1411 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1412 

1413 return mode_differs and st2_has_exec 

1414 

1415 def _determine_symlinks(self) -> bool: 

1416 """Probe the filesystem to determine whether symlinks can be created. 

1417 

1418 Returns: True if symlinks can be created, False otherwise. 

1419 """ 

1420 # TODO(jelmer): Actually probe disk / look at filesystem 

1421 return sys.platform != "win32" 

1422 

1423 def _put_named_file(self, path: str, contents: bytes) -> None: 

1424 """Write a file to the control dir with the given name and contents. 

1425 

1426 Args: 

1427 path: The path to the file, relative to the control dir. 

1428 contents: A string to write to the file. 

1429 """ 

1430 path = path.lstrip(os.path.sep) 

1431 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1432 f.write(contents) 

1433 

1434 def _del_named_file(self, path: str) -> None: 

1435 try: 

1436 os.unlink(os.path.join(self.controldir(), path)) 

1437 except FileNotFoundError: 

1438 return 

1439 

1440 def get_named_file(self, path, basedir=None): 

1441 """Get a file from the control dir with a specific name. 

1442 

1443 Although the filename should be interpreted as a filename relative to 

1444 the control dir in a disk-based Repo, the object returned need not be 

1445 pointing to a file in that location. 

1446 

1447 Args: 

1448 path: The path to the file, relative to the control dir. 

1449 basedir: Optional argument that specifies an alternative to the 

1450 control dir. 

1451 Returns: An open file object, or None if the file does not exist. 

1452 """ 

1453 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1454 # the dumb web serving code. 

1455 if basedir is None: 

1456 basedir = self.controldir() 

1457 path = path.lstrip(os.path.sep) 

1458 try: 

1459 return open(os.path.join(basedir, path), "rb") 

1460 except FileNotFoundError: 

1461 return None 

1462 

1463 def index_path(self): 

1464 """Return path to the index file.""" 

1465 return os.path.join(self.controldir(), INDEX_FILENAME) 

1466 

1467 def open_index(self) -> "Index": 

1468 """Open the index for this repository. 

1469 

1470 Raises: 

1471 NoIndexPresent: If no index is present 

1472 Returns: The matching `Index` 

1473 """ 

1474 from .index import Index 

1475 

1476 if not self.has_index(): 

1477 raise NoIndexPresent 

1478 

1479 # Check for manyFiles feature configuration 

1480 config = self.get_config_stack() 

1481 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1482 skip_hash = False 

1483 index_version = None 

1484 

1485 if many_files: 

1486 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1487 try: 

1488 index_version_str = config.get(b"index", b"version") 

1489 index_version = int(index_version_str) 

1490 except KeyError: 

1491 index_version = 4 # Default to version 4 for manyFiles 

1492 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1493 else: 

1494 # Check for explicit index settings 

1495 try: 

1496 index_version_str = config.get(b"index", b"version") 

1497 index_version = int(index_version_str) 

1498 except KeyError: 

1499 index_version = None 

1500 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1501 

1502 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1503 

1504 def has_index(self) -> bool: 

1505 """Check if an index is present.""" 

1506 # Bare repos must never have index files; non-bare repos may have a 

1507 # missing index file, which is treated as empty. 

1508 return not self.bare 

1509 

1510 @replace_me(remove_in="0.26.0") 

1511 def stage( 

1512 self, 

1513 fs_paths: Union[ 

1514 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1515 ], 

1516 ) -> None: 

1517 """Stage a set of paths. 

1518 

1519 Args: 

1520 fs_paths: List of paths, relative to the repository path 

1521 """ 

1522 return self.get_worktree().stage(fs_paths) 

1523 

1524 @replace_me(remove_in="0.26.0") 

1525 def unstage(self, fs_paths: list[str]) -> None: 

1526 """Unstage specific file in the index. 

1527 

1528 Args: 

1529 fs_paths: a list of files to unstage, 

1530 relative to the repository path. 

1531 """ 

1532 return self.get_worktree().unstage(fs_paths) 

1533 

1534 def clone( 

1535 self, 

1536 target_path, 

1537 *, 

1538 mkdir=True, 

1539 bare=False, 

1540 origin=b"origin", 

1541 checkout=None, 

1542 branch=None, 

1543 progress=None, 

1544 depth: Optional[int] = None, 

1545 symlinks=None, 

1546 ) -> "Repo": 

1547 """Clone this repository. 

1548 

1549 Args: 

1550 target_path: Target path 

1551 mkdir: Create the target directory 

1552 bare: Whether to create a bare repository 

1553 checkout: Whether or not to check-out HEAD after cloning 

1554 origin: Base name for refs in target repository 

1555 cloned from this repository 

1556 branch: Optional branch or tag to be used as HEAD in the new repository 

1557 instead of this repository's HEAD. 

1558 progress: Optional progress function 

1559 depth: Depth at which to fetch 

1560 symlinks: Symlinks setting (default to autodetect) 

1561 Returns: Created repository as `Repo` 

1562 """ 

1563 encoded_path = os.fsencode(self.path) 

1564 

1565 if mkdir: 

1566 os.mkdir(target_path) 

1567 

1568 try: 

1569 if not bare: 

1570 target = Repo.init(target_path, symlinks=symlinks) 

1571 if checkout is None: 

1572 checkout = True 

1573 else: 

1574 if checkout: 

1575 raise ValueError("checkout and bare are incompatible") 

1576 target = Repo.init_bare(target_path) 

1577 

1578 try: 

1579 target_config = target.get_config() 

1580 target_config.set((b"remote", origin), b"url", encoded_path) 

1581 target_config.set( 

1582 (b"remote", origin), 

1583 b"fetch", 

1584 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1585 ) 

1586 target_config.write_to_path() 

1587 

1588 ref_message = b"clone: from " + encoded_path 

1589 self.fetch(target, depth=depth) 

1590 target.refs.import_refs( 

1591 b"refs/remotes/" + origin, 

1592 self.refs.as_dict(b"refs/heads"), 

1593 message=ref_message, 

1594 ) 

1595 target.refs.import_refs( 

1596 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1597 ) 

1598 

1599 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1600 origin_head = head_chain[-1] if head_chain else None 

1601 if origin_sha and not origin_head: 

1602 # set detached HEAD 

1603 target.refs[b"HEAD"] = origin_sha 

1604 else: 

1605 _set_origin_head(target.refs, origin, origin_head) 

1606 head_ref = _set_default_branch( 

1607 target.refs, origin, origin_head, branch, ref_message 

1608 ) 

1609 

1610 # Update target head 

1611 if head_ref: 

1612 head = _set_head(target.refs, head_ref, ref_message) 

1613 else: 

1614 head = None 

1615 

1616 if checkout and head is not None: 

1617 target.get_worktree().reset_index() 

1618 except BaseException: 

1619 target.close() 

1620 raise 

1621 except BaseException: 

1622 if mkdir: 

1623 import shutil 

1624 

1625 shutil.rmtree(target_path) 

1626 raise 

1627 return target 

1628 

1629 @replace_me(remove_in="0.26.0") 

1630 def reset_index(self, tree: Optional[bytes] = None): 

1631 """Reset the index back to a specific tree. 

1632 

1633 Args: 

1634 tree: Tree SHA to reset to, None for current HEAD tree. 

1635 """ 

1636 return self.get_worktree().reset_index(tree) 

1637 

1638 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1639 """Get condition matchers for includeIf conditions. 

1640 

1641 Returns a dict of condition prefix to matcher function. 

1642 """ 

1643 from pathlib import Path 

1644 

1645 from .config import ConditionMatcher, match_glob_pattern 

1646 

1647 # Add gitdir matchers 

1648 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1649 """Match gitdir against a pattern. 

1650 

1651 Args: 

1652 pattern: Pattern to match against 

1653 case_sensitive: Whether to match case-sensitively 

1654 

1655 Returns: 

1656 True if gitdir matches pattern 

1657 """ 

1658 # Handle relative patterns (starting with ./) 

1659 if pattern.startswith("./"): 

1660 # Can't handle relative patterns without config directory context 

1661 return False 

1662 

1663 # Normalize repository path 

1664 try: 

1665 repo_path = str(Path(self._controldir).resolve()) 

1666 except (OSError, ValueError): 

1667 return False 

1668 

1669 # Expand ~ in pattern and normalize 

1670 pattern = os.path.expanduser(pattern) 

1671 

1672 # Normalize pattern following Git's rules 

1673 pattern = pattern.replace("\\", "/") 

1674 if not pattern.startswith(("~/", "./", "/", "**")): 

1675 # Check for Windows absolute path 

1676 if len(pattern) >= 2 and pattern[1] == ":": 

1677 pass 

1678 else: 

1679 pattern = "**/" + pattern 

1680 if pattern.endswith("/"): 

1681 pattern = pattern + "**" 

1682 

1683 # Use the existing _match_gitdir_pattern function 

1684 from .config import _match_gitdir_pattern 

1685 

1686 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1687 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1688 

1689 return _match_gitdir_pattern( 

1690 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1691 ) 

1692 

1693 # Add onbranch matcher 

1694 def match_onbranch(pattern: str) -> bool: 

1695 """Match current branch against a pattern. 

1696 

1697 Args: 

1698 pattern: Pattern to match against 

1699 

1700 Returns: 

1701 True if current branch matches pattern 

1702 """ 

1703 try: 

1704 # Get the current branch using refs 

1705 ref_chain, _ = self.refs.follow(b"HEAD") 

1706 head_ref = ref_chain[-1] # Get the final resolved ref 

1707 except KeyError: 

1708 pass 

1709 else: 

1710 if head_ref and head_ref.startswith(b"refs/heads/"): 

1711 # Extract branch name from ref 

1712 branch = head_ref[11:].decode("utf-8", errors="replace") 

1713 return match_glob_pattern(branch, pattern) 

1714 return False 

1715 

1716 matchers: dict[str, ConditionMatcher] = { 

1717 "onbranch:": match_onbranch, 

1718 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1719 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1720 } 

1721 

1722 return matchers 

1723 

1724 def get_worktree_config(self) -> "ConfigFile": 

1725 """Get the worktree-specific config. 

1726 

1727 Returns: 

1728 ConfigFile object for the worktree config 

1729 """ 

1730 from .config import ConfigFile 

1731 

1732 path = os.path.join(self.commondir(), "config.worktree") 

1733 try: 

1734 # Pass condition matchers for includeIf evaluation 

1735 condition_matchers = self._get_config_condition_matchers() 

1736 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1737 except FileNotFoundError: 

1738 cf = ConfigFile() 

1739 cf.path = path 

1740 return cf 

1741 

1742 def get_config(self) -> "ConfigFile": 

1743 """Retrieve the config object. 

1744 

1745 Returns: `ConfigFile` object for the ``.git/config`` file. 

1746 """ 

1747 from .config import ConfigFile 

1748 

1749 path = os.path.join(self._commondir, "config") 

1750 try: 

1751 # Pass condition matchers for includeIf evaluation 

1752 condition_matchers = self._get_config_condition_matchers() 

1753 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1754 except FileNotFoundError: 

1755 ret = ConfigFile() 

1756 ret.path = path 

1757 return ret 

1758 

1759 def get_rebase_state_manager(self): 

1760 """Get the appropriate rebase state manager for this repository. 

1761 

1762 Returns: DiskRebaseStateManager instance 

1763 """ 

1764 import os 

1765 

1766 from .rebase import DiskRebaseStateManager 

1767 

1768 path = os.path.join(self.controldir(), "rebase-merge") 

1769 return DiskRebaseStateManager(path) 

1770 

1771 def get_description(self): 

1772 """Retrieve the description of this repository. 

1773 

1774 Returns: A string describing the repository or None. 

1775 """ 

1776 path = os.path.join(self._controldir, "description") 

1777 try: 

1778 with GitFile(path, "rb") as f: 

1779 return f.read() 

1780 except FileNotFoundError: 

1781 return None 

1782 

1783 def __repr__(self) -> str: 

1784 """Return string representation of this repository.""" 

1785 return f"<Repo at {self.path!r}>" 

1786 

1787 def set_description(self, description) -> None: 

1788 """Set the description for this repository. 

1789 

1790 Args: 

1791 description: Text to set as description for this repository. 

1792 """ 

1793 self._put_named_file("description", description) 

1794 

1795 @classmethod 

1796 def _init_maybe_bare( 

1797 cls, 

1798 path: Union[str, bytes, os.PathLike], 

1799 controldir: Union[str, bytes, os.PathLike], 

1800 bare, 

1801 object_store=None, 

1802 config=None, 

1803 default_branch=None, 

1804 symlinks: Optional[bool] = None, 

1805 format: Optional[int] = None, 

1806 ): 

1807 path = os.fspath(path) 

1808 if isinstance(path, bytes): 

1809 path = os.fsdecode(path) 

1810 controldir = os.fspath(controldir) 

1811 if isinstance(controldir, bytes): 

1812 controldir = os.fsdecode(controldir) 

1813 for d in BASE_DIRECTORIES: 

1814 os.mkdir(os.path.join(controldir, *d)) 

1815 if object_store is None: 

1816 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1817 ret = cls(path, bare=bare, object_store=object_store) 

1818 if default_branch is None: 

1819 if config is None: 

1820 from .config import StackedConfig 

1821 

1822 config = StackedConfig.default() 

1823 try: 

1824 default_branch = config.get("init", "defaultBranch") 

1825 except KeyError: 

1826 default_branch = DEFAULT_BRANCH 

1827 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1828 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1829 return ret 

1830 

1831 @classmethod 

1832 def init( 

1833 cls, 

1834 path: Union[str, bytes, os.PathLike], 

1835 *, 

1836 mkdir: bool = False, 

1837 config=None, 

1838 default_branch=None, 

1839 symlinks: Optional[bool] = None, 

1840 format: Optional[int] = None, 

1841 ) -> "Repo": 

1842 """Create a new repository. 

1843 

1844 Args: 

1845 path: Path in which to create the repository 

1846 mkdir: Whether to create the directory 

1847 config: Configuration object 

1848 default_branch: Default branch name 

1849 symlinks: Whether to support symlinks 

1850 format: Repository format version (defaults to 0) 

1851 Returns: `Repo` instance 

1852 """ 

1853 path = os.fspath(path) 

1854 if isinstance(path, bytes): 

1855 path = os.fsdecode(path) 

1856 if mkdir: 

1857 os.mkdir(path) 

1858 controldir = os.path.join(path, CONTROLDIR) 

1859 os.mkdir(controldir) 

1860 _set_filesystem_hidden(controldir) 

1861 return cls._init_maybe_bare( 

1862 path, 

1863 controldir, 

1864 False, 

1865 config=config, 

1866 default_branch=default_branch, 

1867 symlinks=symlinks, 

1868 format=format, 

1869 ) 

1870 

1871 @classmethod 

1872 def _init_new_working_directory( 

1873 cls, 

1874 path: Union[str, bytes, os.PathLike], 

1875 main_repo, 

1876 identifier=None, 

1877 mkdir=False, 

1878 ): 

1879 """Create a new working directory linked to a repository. 

1880 

1881 Args: 

1882 path: Path in which to create the working tree. 

1883 main_repo: Main repository to reference 

1884 identifier: Worktree identifier 

1885 mkdir: Whether to create the directory 

1886 Returns: `Repo` instance 

1887 """ 

1888 path = os.fspath(path) 

1889 if isinstance(path, bytes): 

1890 path = os.fsdecode(path) 

1891 if mkdir: 

1892 os.mkdir(path) 

1893 if identifier is None: 

1894 identifier = os.path.basename(path) 

1895 # Ensure we use absolute path for the worktree control directory 

1896 main_controldir = os.path.abspath(main_repo.controldir()) 

1897 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1898 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1899 gitdirfile = os.path.join(path, CONTROLDIR) 

1900 with open(gitdirfile, "wb") as f: 

1901 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1902 try: 

1903 os.mkdir(main_worktreesdir) 

1904 except FileExistsError: 

1905 pass 

1906 try: 

1907 os.mkdir(worktree_controldir) 

1908 except FileExistsError: 

1909 pass 

1910 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1911 f.write(os.fsencode(gitdirfile) + b"\n") 

1912 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1913 f.write(b"../..\n") 

1914 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1915 f.write(main_repo.head() + b"\n") 

1916 r = cls(os.path.normpath(path)) 

1917 r.get_worktree().reset_index() 

1918 return r 

1919 

1920 @classmethod 

1921 def init_bare( 

1922 cls, 

1923 path: Union[str, bytes, os.PathLike], 

1924 *, 

1925 mkdir=False, 

1926 object_store=None, 

1927 config=None, 

1928 default_branch=None, 

1929 format: Optional[int] = None, 

1930 ): 

1931 """Create a new bare repository. 

1932 

1933 ``path`` should already exist and be an empty directory. 

1934 

1935 Args: 

1936 path: Path to create bare repository in 

1937 mkdir: Whether to create the directory 

1938 object_store: Object store to use 

1939 config: Configuration object 

1940 default_branch: Default branch name 

1941 format: Repository format version (defaults to 0) 

1942 Returns: a `Repo` instance 

1943 """ 

1944 path = os.fspath(path) 

1945 if isinstance(path, bytes): 

1946 path = os.fsdecode(path) 

1947 if mkdir: 

1948 os.mkdir(path) 

1949 return cls._init_maybe_bare( 

1950 path, 

1951 path, 

1952 True, 

1953 object_store=object_store, 

1954 config=config, 

1955 default_branch=default_branch, 

1956 format=format, 

1957 ) 

1958 

1959 create = init_bare 

1960 

1961 def close(self) -> None: 

1962 """Close any files opened by this repository.""" 

1963 self.object_store.close() 

1964 

1965 def __enter__(self): 

1966 """Enter context manager.""" 

1967 return self 

1968 

1969 def __exit__(self, exc_type, exc_val, exc_tb): 

1970 """Exit context manager and close repository.""" 

1971 self.close() 

1972 

1973 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

1974 """Read .gitattributes file from working tree. 

1975 

1976 Returns: 

1977 Dictionary mapping file patterns to attributes 

1978 """ 

1979 gitattributes = {} 

1980 gitattributes_path = os.path.join(self.path, ".gitattributes") 

1981 

1982 if os.path.exists(gitattributes_path): 

1983 with open(gitattributes_path, "rb") as f: 

1984 for line in f: 

1985 line = line.strip() 

1986 if not line or line.startswith(b"#"): 

1987 continue 

1988 

1989 parts = line.split() 

1990 if len(parts) < 2: 

1991 continue 

1992 

1993 pattern = parts[0] 

1994 attrs = {} 

1995 

1996 for attr in parts[1:]: 

1997 if attr.startswith(b"-"): 

1998 # Unset attribute 

1999 attrs[attr[1:]] = b"false" 

2000 elif b"=" in attr: 

2001 # Set to value 

2002 key, value = attr.split(b"=", 1) 

2003 attrs[key] = value 

2004 else: 

2005 # Set attribute 

2006 attrs[attr] = b"true" 

2007 

2008 gitattributes[pattern] = attrs 

2009 

2010 return gitattributes 

2011 

2012 def get_blob_normalizer(self): 

2013 """Return a BlobNormalizer object.""" 

2014 from .filters import FilterBlobNormalizer, FilterRegistry 

2015 

2016 # Get proper GitAttributes object 

2017 git_attributes = self.get_gitattributes() 

2018 config_stack = self.get_config_stack() 

2019 

2020 # Create FilterRegistry with repo reference 

2021 filter_registry = FilterRegistry(config_stack, self) 

2022 

2023 # Return FilterBlobNormalizer which handles all filters including line endings 

2024 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2025 

2026 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2027 """Read gitattributes for the repository. 

2028 

2029 Args: 

2030 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2031 

2032 Returns: 

2033 GitAttributes object that can be used to match paths 

2034 """ 

2035 from .attrs import ( 

2036 GitAttributes, 

2037 Pattern, 

2038 parse_git_attributes, 

2039 ) 

2040 

2041 patterns = [] 

2042 

2043 # Read system gitattributes (TODO: implement this) 

2044 # Read global gitattributes (TODO: implement this) 

2045 

2046 # Read repository .gitattributes from index/tree 

2047 if tree is None: 

2048 try: 

2049 # Try to get from HEAD 

2050 head = self[b"HEAD"] 

2051 if isinstance(head, Tag): 

2052 _cls, obj = head.object 

2053 head = self.get_object(obj) 

2054 assert isinstance(head, Commit) 

2055 tree = head.tree 

2056 except KeyError: 

2057 # No HEAD, no attributes from tree 

2058 pass 

2059 

2060 if tree is not None: 

2061 try: 

2062 tree_obj = self[tree] 

2063 assert isinstance(tree_obj, Tree) 

2064 if b".gitattributes" in tree_obj: 

2065 _, attrs_sha = tree_obj[b".gitattributes"] 

2066 attrs_blob = self[attrs_sha] 

2067 if isinstance(attrs_blob, Blob): 

2068 attrs_data = BytesIO(attrs_blob.data) 

2069 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2070 pattern = Pattern(pattern_bytes) 

2071 patterns.append((pattern, attrs)) 

2072 except (KeyError, NotTreeError): 

2073 pass 

2074 

2075 # Read .git/info/attributes 

2076 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2077 if os.path.exists(info_attrs_path): 

2078 with open(info_attrs_path, "rb") as f: 

2079 for pattern_bytes, attrs in parse_git_attributes(f): 

2080 pattern = Pattern(pattern_bytes) 

2081 patterns.append((pattern, attrs)) 

2082 

2083 # Read .gitattributes from working directory (if it exists) 

2084 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2085 if os.path.exists(working_attrs_path): 

2086 with open(working_attrs_path, "rb") as f: 

2087 for pattern_bytes, attrs in parse_git_attributes(f): 

2088 pattern = Pattern(pattern_bytes) 

2089 patterns.append((pattern, attrs)) 

2090 

2091 return GitAttributes(patterns) 

2092 

2093 @replace_me(remove_in="0.26.0") 

2094 def _sparse_checkout_file_path(self) -> str: 

2095 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2096 return self.get_worktree()._sparse_checkout_file_path() 

2097 

2098 @replace_me(remove_in="0.26.0") 

2099 def configure_for_cone_mode(self) -> None: 

2100 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2101 return self.get_worktree().configure_for_cone_mode() 

2102 

2103 @replace_me(remove_in="0.26.0") 

2104 def infer_cone_mode(self) -> bool: 

2105 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2106 return self.get_worktree().infer_cone_mode() 

2107 

2108 @replace_me(remove_in="0.26.0") 

2109 def get_sparse_checkout_patterns(self) -> list[str]: 

2110 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2111 

2112 Returns: 

2113 A list of patterns. Returns an empty list if the file is missing. 

2114 """ 

2115 return self.get_worktree().get_sparse_checkout_patterns() 

2116 

2117 @replace_me(remove_in="0.26.0") 

2118 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2119 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2120 

2121 Creates the info/ directory if it does not exist. 

2122 

2123 Args: 

2124 patterns: A list of gitignore-style patterns to store. 

2125 """ 

2126 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2127 

2128 @replace_me(remove_in="0.26.0") 

2129 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2130 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2131 

2132 For each directory to include, add an inclusion line that "undoes" the prior 

2133 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2134 Never add the same line twice. 

2135 """ 

2136 return self.get_worktree().set_cone_mode_patterns(dirs) 

2137 

2138 

2139class MemoryRepo(BaseRepo): 

2140 """Repo that stores refs, objects, and named files in memory. 

2141 

2142 MemoryRepos are always bare: they have no working tree and no index, since 

2143 those have a stronger dependency on the filesystem. 

2144 """ 

2145 

2146 def __init__(self) -> None: 

2147 """Create a new repository in memory.""" 

2148 from .config import ConfigFile 

2149 

2150 self._reflog: list[Any] = [] 

2151 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2152 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore[arg-type] 

2153 self._named_files: dict[str, bytes] = {} 

2154 self.bare = True 

2155 self._config = ConfigFile() 

2156 self._description = None 

2157 

2158 def _append_reflog(self, *args) -> None: 

2159 self._reflog.append(args) 

2160 

2161 def set_description(self, description) -> None: 

2162 """Set the description for this repository. 

2163 

2164 Args: 

2165 description: Text to set as description 

2166 """ 

2167 self._description = description 

2168 

2169 def get_description(self): 

2170 """Get the description of this repository. 

2171 

2172 Returns: 

2173 Repository description as bytes 

2174 """ 

2175 return self._description 

2176 

2177 def _determine_file_mode(self): 

2178 """Probe the file-system to determine whether permissions can be trusted. 

2179 

2180 Returns: True if permissions can be trusted, False otherwise. 

2181 """ 

2182 return sys.platform != "win32" 

2183 

2184 def _determine_symlinks(self): 

2185 """Probe the file-system to determine whether permissions can be trusted. 

2186 

2187 Returns: True if permissions can be trusted, False otherwise. 

2188 """ 

2189 return sys.platform != "win32" 

2190 

2191 def _put_named_file(self, path, contents) -> None: 

2192 """Write a file to the control dir with the given name and contents. 

2193 

2194 Args: 

2195 path: The path to the file, relative to the control dir. 

2196 contents: A string to write to the file. 

2197 """ 

2198 self._named_files[path] = contents 

2199 

2200 def _del_named_file(self, path) -> None: 

2201 try: 

2202 del self._named_files[path] 

2203 except KeyError: 

2204 pass 

2205 

2206 def get_named_file(self, path, basedir=None): 

2207 """Get a file from the control dir with a specific name. 

2208 

2209 Although the filename should be interpreted as a filename relative to 

2210 the control dir in a disk-baked Repo, the object returned need not be 

2211 pointing to a file in that location. 

2212 

2213 Args: 

2214 path: The path to the file, relative to the control dir. 

2215 basedir: Optional base directory for the path 

2216 Returns: An open file object, or None if the file does not exist. 

2217 """ 

2218 contents = self._named_files.get(path, None) 

2219 if contents is None: 

2220 return None 

2221 return BytesIO(contents) 

2222 

2223 def open_index(self) -> "Index": 

2224 """Fail to open index for this repo, since it is bare. 

2225 

2226 Raises: 

2227 NoIndexPresent: Raised when no index is present 

2228 """ 

2229 raise NoIndexPresent 

2230 

2231 def get_config(self): 

2232 """Retrieve the config object. 

2233 

2234 Returns: `ConfigFile` object. 

2235 """ 

2236 return self._config 

2237 

2238 def get_rebase_state_manager(self): 

2239 """Get the appropriate rebase state manager for this repository. 

2240 

2241 Returns: MemoryRebaseStateManager instance 

2242 """ 

2243 from .rebase import MemoryRebaseStateManager 

2244 

2245 return MemoryRebaseStateManager(self) 

2246 

2247 def get_blob_normalizer(self): 

2248 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2249 from .filters import FilterBlobNormalizer, FilterRegistry 

2250 

2251 # Get GitAttributes object 

2252 git_attributes = self.get_gitattributes() 

2253 config_stack = self.get_config_stack() 

2254 

2255 # Create FilterRegistry with repo reference 

2256 filter_registry = FilterRegistry(config_stack, self) 

2257 

2258 # Return FilterBlobNormalizer which handles all filters 

2259 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2260 

2261 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2262 """Read gitattributes for the repository.""" 

2263 from .attrs import GitAttributes 

2264 

2265 # Memory repos don't have working trees or gitattributes files 

2266 # Return empty GitAttributes 

2267 return GitAttributes([]) 

2268 

2269 def do_commit( 

2270 self, 

2271 message: Optional[bytes] = None, 

2272 committer: Optional[bytes] = None, 

2273 author: Optional[bytes] = None, 

2274 commit_timestamp=None, 

2275 commit_timezone=None, 

2276 author_timestamp=None, 

2277 author_timezone=None, 

2278 tree: Optional[ObjectID] = None, 

2279 encoding: Optional[bytes] = None, 

2280 ref: Optional[Ref] = b"HEAD", 

2281 merge_heads: Optional[list[ObjectID]] = None, 

2282 no_verify: bool = False, 

2283 sign: bool = False, 

2284 ): 

2285 """Create a new commit. 

2286 

2287 This is a simplified implementation for in-memory repositories that 

2288 doesn't support worktree operations or hooks. 

2289 

2290 Args: 

2291 message: Commit message 

2292 committer: Committer fullname 

2293 author: Author fullname 

2294 commit_timestamp: Commit timestamp (defaults to now) 

2295 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2296 author_timestamp: Author timestamp (defaults to commit timestamp) 

2297 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2298 tree: SHA1 of the tree root to use 

2299 encoding: Encoding 

2300 ref: Optional ref to commit to (defaults to current branch). 

2301 If None, creates a dangling commit without updating any ref. 

2302 merge_heads: Merge heads 

2303 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2304 sign: GPG Sign the commit (ignored for MemoryRepo) 

2305 

2306 Returns: 

2307 New commit SHA1 

2308 """ 

2309 import time 

2310 

2311 from .objects import Commit 

2312 

2313 if tree is None: 

2314 raise ValueError("tree must be specified for MemoryRepo") 

2315 

2316 c = Commit() 

2317 if len(tree) != 40: 

2318 raise ValueError("tree must be a 40-byte hex sha string") 

2319 c.tree = tree 

2320 

2321 config = self.get_config_stack() 

2322 if merge_heads is None: 

2323 merge_heads = [] 

2324 if committer is None: 

2325 committer = get_user_identity(config, kind="COMMITTER") 

2326 check_user_identity(committer) 

2327 c.committer = committer 

2328 if commit_timestamp is None: 

2329 commit_timestamp = time.time() 

2330 c.commit_time = int(commit_timestamp) 

2331 if commit_timezone is None: 

2332 commit_timezone = 0 

2333 c.commit_timezone = commit_timezone 

2334 if author is None: 

2335 author = get_user_identity(config, kind="AUTHOR") 

2336 c.author = author 

2337 check_user_identity(author) 

2338 if author_timestamp is None: 

2339 author_timestamp = commit_timestamp 

2340 c.author_time = int(author_timestamp) 

2341 if author_timezone is None: 

2342 author_timezone = commit_timezone 

2343 c.author_timezone = author_timezone 

2344 if encoding is None: 

2345 try: 

2346 encoding = config.get(("i18n",), "commitEncoding") 

2347 except KeyError: 

2348 pass 

2349 if encoding is not None: 

2350 c.encoding = encoding 

2351 

2352 # Handle message (for MemoryRepo, we don't support callable messages) 

2353 if callable(message): 

2354 message = message(self, c) 

2355 if message is None: 

2356 raise ValueError("Message callback returned None") 

2357 

2358 if message is None: 

2359 raise ValueError("No commit message specified") 

2360 

2361 c.message = message 

2362 

2363 if ref is None: 

2364 # Create a dangling commit 

2365 c.parents = merge_heads 

2366 self.object_store.add_object(c) 

2367 else: 

2368 try: 

2369 old_head = self.refs[ref] 

2370 c.parents = [old_head, *merge_heads] 

2371 self.object_store.add_object(c) 

2372 ok = self.refs.set_if_equals( 

2373 ref, 

2374 old_head, 

2375 c.id, 

2376 message=b"commit: " + message, 

2377 committer=committer, 

2378 timestamp=commit_timestamp, 

2379 timezone=commit_timezone, 

2380 ) 

2381 except KeyError: 

2382 c.parents = merge_heads 

2383 self.object_store.add_object(c) 

2384 ok = self.refs.add_if_new( 

2385 ref, 

2386 c.id, 

2387 message=b"commit: " + message, 

2388 committer=committer, 

2389 timestamp=commit_timestamp, 

2390 timezone=commit_timezone, 

2391 ) 

2392 if not ok: 

2393 from .errors import CommitError 

2394 

2395 raise CommitError(f"{ref!r} changed during commit") 

2396 

2397 return c.id 

2398 

2399 @classmethod 

2400 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2401 """Create a new bare repository in memory. 

2402 

2403 Args: 

2404 objects: Objects for the new repository, 

2405 as iterable 

2406 refs: Refs as dictionary, mapping names 

2407 to object SHA1s 

2408 format: Repository format version (defaults to 0) 

2409 """ 

2410 ret = cls() 

2411 for obj in objects: 

2412 ret.object_store.add_object(obj) 

2413 for refname, sha in refs.items(): 

2414 ret.refs.add_if_new(refname, sha) 

2415 ret._init_files(bare=True, format=format) 

2416 return ret