Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

960 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 Union, 

46) 

47 

48if TYPE_CHECKING: 

49 # There are no circular imports here, but we try to defer imports as long 

50 # as possible to reduce start-up time for anything that doesn't need 

51 # these imports. 

52 from .attrs import GitAttributes 

53 from .config import ConditionMatcher, ConfigFile, StackedConfig 

54 from .index import Index 

55 from .notes import Notes 

56 from .worktree import WorkTree 

57 

58from . import replace_me 

59from .errors import ( 

60 NoIndexPresent, 

61 NotBlobError, 

62 NotCommitError, 

63 NotGitRepository, 

64 NotTagError, 

65 NotTreeError, 

66 RefFormatError, 

67) 

68from .file import GitFile 

69from .hooks import ( 

70 CommitMsgShellHook, 

71 Hook, 

72 PostCommitShellHook, 

73 PostReceiveShellHook, 

74 PreCommitShellHook, 

75) 

76from .object_store import ( 

77 DiskObjectStore, 

78 MemoryObjectStore, 

79 MissingObjectFinder, 

80 ObjectStoreGraphWalker, 

81 PackBasedObjectStore, 

82 find_shallow, 

83 peel_sha, 

84) 

85from .objects import ( 

86 Blob, 

87 Commit, 

88 ObjectID, 

89 ShaFile, 

90 Tag, 

91 Tree, 

92 check_hexsha, 

93 valid_hexsha, 

94) 

95from .pack import generate_unpacked_objects 

96from .refs import ( 

97 ANNOTATED_TAG_SUFFIX, # noqa: F401 

98 LOCAL_BRANCH_PREFIX, 

99 LOCAL_TAG_PREFIX, # noqa: F401 

100 SYMREF, # noqa: F401 

101 DictRefsContainer, 

102 DiskRefsContainer, 

103 InfoRefsContainer, # noqa: F401 

104 Ref, 

105 RefsContainer, 

106 _set_default_branch, 

107 _set_head, 

108 _set_origin_head, 

109 check_ref_format, # noqa: F401 

110 read_packed_refs, # noqa: F401 

111 read_packed_refs_with_peeled, # noqa: F401 

112 serialize_refs, 

113 write_packed_refs, # noqa: F401 

114) 

115 

116CONTROLDIR = ".git" 

117OBJECTDIR = "objects" 

118REFSDIR = "refs" 

119REFSDIR_TAGS = "tags" 

120REFSDIR_HEADS = "heads" 

121INDEX_FILENAME = "index" 

122COMMONDIR = "commondir" 

123GITDIR = "gitdir" 

124WORKTREES = "worktrees" 

125 

126BASE_DIRECTORIES = [ 

127 ["branches"], 

128 [REFSDIR], 

129 [REFSDIR, REFSDIR_TAGS], 

130 [REFSDIR, REFSDIR_HEADS], 

131 ["hooks"], 

132 ["info"], 

133] 

134 

135DEFAULT_BRANCH = b"master" 

136 

137 

138class InvalidUserIdentity(Exception): 

139 """User identity is not of the format 'user <email>'.""" 

140 

141 def __init__(self, identity) -> None: 

142 self.identity = identity 

143 

144 

145class DefaultIdentityNotFound(Exception): 

146 """Default identity could not be determined.""" 

147 

148 

149# TODO(jelmer): Cache? 

150def _get_default_identity() -> tuple[str, str]: 

151 import socket 

152 

153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

154 username = os.environ.get(name) 

155 if username: 

156 break 

157 else: 

158 username = None 

159 

160 try: 

161 import pwd 

162 except ImportError: 

163 fullname = None 

164 else: 

165 try: 

166 entry = pwd.getpwuid(os.getuid()) # type: ignore 

167 except KeyError: 

168 fullname = None 

169 else: 

170 if getattr(entry, "gecos", None): 

171 fullname = entry.pw_gecos.split(",")[0] 

172 else: 

173 fullname = None 

174 if username is None: 

175 username = entry.pw_name 

176 if not fullname: 

177 if username is None: 

178 raise DefaultIdentityNotFound("no username found") 

179 fullname = username 

180 email = os.environ.get("EMAIL") 

181 if email is None: 

182 if username is None: 

183 raise DefaultIdentityNotFound("no username found") 

184 email = f"{username}@{socket.gethostname()}" 

185 return (fullname, email) 

186 

187 

188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

189 """Determine the identity to use for new commits. 

190 

191 If kind is set, this first checks 

192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

193 

194 If those variables are not set, then it will fall back 

195 to reading the user.name and user.email settings from 

196 the specified configuration. 

197 

198 If that also fails, then it will fall back to using 

199 the current users' identity as obtained from the host 

200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

201 

202 Args: 

203 kind: Optional kind to return identity for, 

204 usually either "AUTHOR" or "COMMITTER". 

205 

206 Returns: 

207 A user identity 

208 """ 

209 user: Optional[bytes] = None 

210 email: Optional[bytes] = None 

211 if kind: 

212 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

213 if user_uc is not None: 

214 user = user_uc.encode("utf-8") 

215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

216 if email_uc is not None: 

217 email = email_uc.encode("utf-8") 

218 if user is None: 

219 try: 

220 user = config.get(("user",), "name") 

221 except KeyError: 

222 user = None 

223 if email is None: 

224 try: 

225 email = config.get(("user",), "email") 

226 except KeyError: 

227 email = None 

228 default_user, default_email = _get_default_identity() 

229 if user is None: 

230 user = default_user.encode("utf-8") 

231 if email is None: 

232 email = default_email.encode("utf-8") 

233 if email.startswith(b"<") and email.endswith(b">"): 

234 email = email[1:-1] 

235 return user + b" <" + email + b">" 

236 

237 

238def check_user_identity(identity) -> None: 

239 """Verify that a user identity is formatted correctly. 

240 

241 Args: 

242 identity: User identity bytestring 

243 Raises: 

244 InvalidUserIdentity: Raised when identity is invalid 

245 """ 

246 try: 

247 fst, snd = identity.split(b" <", 1) 

248 except ValueError as exc: 

249 raise InvalidUserIdentity(identity) from exc 

250 if b">" not in snd: 

251 raise InvalidUserIdentity(identity) 

252 if b"\0" in identity or b"\n" in identity: 

253 raise InvalidUserIdentity(identity) 

254 

255 

256def parse_graftpoints( 

257 graftpoints: Iterable[bytes], 

258) -> dict[bytes, list[bytes]]: 

259 """Convert a list of graftpoints into a dict. 

260 

261 Args: 

262 graftpoints: Iterator of graftpoint lines 

263 

264 Each line is formatted as: 

265 <commit sha1> <parent sha1> [<parent sha1>]* 

266 

267 Resulting dictionary is: 

268 <commit sha1>: [<parent sha1>*] 

269 

270 https://git.wiki.kernel.org/index.php/GraftPoint 

271 """ 

272 grafts = {} 

273 for line in graftpoints: 

274 raw_graft = line.split(None, 1) 

275 

276 commit = raw_graft[0] 

277 if len(raw_graft) == 2: 

278 parents = raw_graft[1].split() 

279 else: 

280 parents = [] 

281 

282 for sha in [commit, *parents]: 

283 check_hexsha(sha, "Invalid graftpoint") 

284 

285 grafts[commit] = parents 

286 return grafts 

287 

288 

289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

290 """Convert a dictionary of grafts into string. 

291 

292 The graft dictionary is: 

293 <commit sha1>: [<parent sha1>*] 

294 

295 Each line is formatted as: 

296 <commit sha1> <parent sha1> [<parent sha1>]* 

297 

298 https://git.wiki.kernel.org/index.php/GraftPoint 

299 

300 """ 

301 graft_lines = [] 

302 for commit, parents in graftpoints.items(): 

303 if parents: 

304 graft_lines.append(commit + b" " + b" ".join(parents)) 

305 else: 

306 graft_lines.append(commit) 

307 return b"\n".join(graft_lines) 

308 

309 

310def _set_filesystem_hidden(path) -> None: 

311 """Mark path as to be hidden if supported by platform and filesystem. 

312 

313 On win32 uses SetFileAttributesW api: 

314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

315 """ 

316 if sys.platform == "win32": 

317 import ctypes 

318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

319 

320 FILE_ATTRIBUTE_HIDDEN = 2 

321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

322 ("SetFileAttributesW", ctypes.windll.kernel32) 

323 ) 

324 

325 if isinstance(path, bytes): 

326 path = os.fsdecode(path) 

327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

328 pass # Could raise or log `ctypes.WinError()` here 

329 

330 # Could implement other platform specific filesystem hiding here 

331 

332 

333class ParentsProvider: 

334 def __init__(self, store, grafts={}, shallows=[]) -> None: 

335 self.store = store 

336 self.grafts = grafts 

337 self.shallows = set(shallows) 

338 

339 # Get commit graph once at initialization for performance 

340 self.commit_graph = store.get_commit_graph() 

341 

342 def get_parents(self, commit_id, commit=None): 

343 try: 

344 return self.grafts[commit_id] 

345 except KeyError: 

346 pass 

347 if commit_id in self.shallows: 

348 return [] 

349 

350 # Try to use commit graph for faster parent lookup 

351 if self.commit_graph: 

352 parents = self.commit_graph.get_parents(commit_id) 

353 if parents is not None: 

354 return parents 

355 

356 # Fallback to reading the commit object 

357 if commit is None: 

358 commit = self.store[commit_id] 

359 return commit.parents 

360 

361 

362class BaseRepo: 

363 """Base class for a git repository. 

364 

365 This base class is meant to be used for Repository implementations that e.g. 

366 work on top of a different transport than a standard filesystem path. 

367 

368 Attributes: 

369 object_store: Dictionary-like object for accessing 

370 the objects 

371 refs: Dictionary-like object with the refs in this 

372 repository 

373 """ 

374 

375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

376 """Open a repository. 

377 

378 This shouldn't be called directly, but rather through one of the 

379 base classes, such as MemoryRepo or Repo. 

380 

381 Args: 

382 object_store: Object store to use 

383 refs: Refs container to use 

384 """ 

385 self.object_store = object_store 

386 self.refs = refs 

387 

388 self._graftpoints: dict[bytes, list[bytes]] = {} 

389 self.hooks: dict[str, Hook] = {} 

390 

391 def _determine_file_mode(self) -> bool: 

392 """Probe the file-system to determine whether permissions can be trusted. 

393 

394 Returns: True if permissions can be trusted, False otherwise. 

395 """ 

396 raise NotImplementedError(self._determine_file_mode) 

397 

398 def _determine_symlinks(self) -> bool: 

399 """Probe the filesystem to determine whether symlinks can be created. 

400 

401 Returns: True if symlinks can be created, False otherwise. 

402 """ 

403 # For now, just mimic the old behaviour 

404 return sys.platform != "win32" 

405 

406 def _init_files( 

407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

408 ) -> None: 

409 """Initialize a default set of named files.""" 

410 from .config import ConfigFile 

411 

412 self._put_named_file("description", b"Unnamed repository") 

413 f = BytesIO() 

414 cf = ConfigFile() 

415 if format is None: 

416 format = 0 

417 if format not in (0, 1): 

418 raise ValueError(f"Unsupported repository format version: {format}") 

419 cf.set("core", "repositoryformatversion", str(format)) 

420 if self._determine_file_mode(): 

421 cf.set("core", "filemode", True) 

422 else: 

423 cf.set("core", "filemode", False) 

424 

425 if symlinks is None and not bare: 

426 symlinks = self._determine_symlinks() 

427 

428 if symlinks is False: 

429 cf.set("core", "symlinks", symlinks) 

430 

431 cf.set("core", "bare", bare) 

432 cf.set("core", "logallrefupdates", True) 

433 cf.write_to_file(f) 

434 self._put_named_file("config", f.getvalue()) 

435 self._put_named_file(os.path.join("info", "exclude"), b"") 

436 

437 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

438 """Get a file from the control dir with a specific name. 

439 

440 Although the filename should be interpreted as a filename relative to 

441 the control dir in a disk-based Repo, the object returned need not be 

442 pointing to a file in that location. 

443 

444 Args: 

445 path: The path to the file, relative to the control dir. 

446 Returns: An open file object, or None if the file does not exist. 

447 """ 

448 raise NotImplementedError(self.get_named_file) 

449 

450 def _put_named_file(self, path: str, contents: bytes) -> None: 

451 """Write a file to the control dir with the given name and contents. 

452 

453 Args: 

454 path: The path to the file, relative to the control dir. 

455 contents: A string to write to the file. 

456 """ 

457 raise NotImplementedError(self._put_named_file) 

458 

459 def _del_named_file(self, path: str) -> None: 

460 """Delete a file in the control directory with the given name.""" 

461 raise NotImplementedError(self._del_named_file) 

462 

463 def open_index(self) -> "Index": 

464 """Open the index for this repository. 

465 

466 Raises: 

467 NoIndexPresent: If no index is present 

468 Returns: The matching `Index` 

469 """ 

470 raise NotImplementedError(self.open_index) 

471 

472 def fetch( 

473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None 

474 ): 

475 """Fetch objects into another repository. 

476 

477 Args: 

478 target: The target repository 

479 determine_wants: Optional function to determine what refs to 

480 fetch. 

481 progress: Optional progress function 

482 depth: Optional shallow fetch depth 

483 Returns: The local refs 

484 """ 

485 if determine_wants is None: 

486 determine_wants = target.object_store.determine_wants_all 

487 count, pack_data = self.fetch_pack_data( 

488 determine_wants, 

489 target.get_graph_walker(), 

490 progress=progress, 

491 depth=depth, 

492 ) 

493 target.object_store.add_pack_data(count, pack_data, progress) 

494 return self.get_refs() 

495 

496 def fetch_pack_data( 

497 self, 

498 determine_wants, 

499 graph_walker, 

500 progress, 

501 *, 

502 get_tagged=None, 

503 depth: Optional[int] = None, 

504 ): 

505 """Fetch the pack data required for a set of revisions. 

506 

507 Args: 

508 determine_wants: Function that takes a dictionary with heads 

509 and returns the list of heads to fetch. 

510 graph_walker: Object that can iterate over the list of revisions 

511 to fetch and has an "ack" method that will be called to acknowledge 

512 that a revision is present. 

513 progress: Simple progress function that will be called with 

514 updated progress strings. 

515 get_tagged: Function that returns a dict of pointed-to sha -> 

516 tag sha for including tags. 

517 depth: Shallow fetch depth 

518 Returns: count and iterator over pack data 

519 """ 

520 missing_objects = self.find_missing_objects( 

521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

522 ) 

523 if missing_objects is None: 

524 return 0, iter([]) 

525 remote_has = missing_objects.get_remote_has() 

526 object_ids = list(missing_objects) 

527 return len(object_ids), generate_unpacked_objects( 

528 self.object_store, object_ids, progress=progress, other_haves=remote_has 

529 ) 

530 

531 def find_missing_objects( 

532 self, 

533 determine_wants, 

534 graph_walker, 

535 progress, 

536 *, 

537 get_tagged=None, 

538 depth: Optional[int] = None, 

539 ) -> Optional[MissingObjectFinder]: 

540 """Fetch the missing objects required for a set of revisions. 

541 

542 Args: 

543 determine_wants: Function that takes a dictionary with heads 

544 and returns the list of heads to fetch. 

545 graph_walker: Object that can iterate over the list of revisions 

546 to fetch and has an "ack" method that will be called to acknowledge 

547 that a revision is present. 

548 progress: Simple progress function that will be called with 

549 updated progress strings. 

550 get_tagged: Function that returns a dict of pointed-to sha -> 

551 tag sha for including tags. 

552 depth: Shallow fetch depth 

553 Returns: iterator over objects, with __len__ implemented 

554 """ 

555 refs = serialize_refs(self.object_store, self.get_refs()) 

556 

557 wants = determine_wants(refs) 

558 if not isinstance(wants, list): 

559 raise TypeError("determine_wants() did not return a list") 

560 

561 current_shallow = set(getattr(graph_walker, "shallow", set())) 

562 

563 if depth not in (None, 0): 

564 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

565 # Only update if graph_walker has shallow attribute 

566 if hasattr(graph_walker, "shallow"): 

567 graph_walker.shallow.update(shallow - not_shallow) 

568 new_shallow = graph_walker.shallow - current_shallow 

569 unshallow = graph_walker.unshallow = not_shallow & current_shallow 

570 if hasattr(graph_walker, "update_shallow"): 

571 graph_walker.update_shallow(new_shallow, unshallow) 

572 else: 

573 unshallow = getattr(graph_walker, "unshallow", frozenset()) 

574 

575 if wants == []: 

576 # TODO(dborowitz): find a way to short-circuit that doesn't change 

577 # this interface. 

578 

579 if getattr(graph_walker, "shallow", set()) or unshallow: 

580 # Do not send a pack in shallow short-circuit path 

581 return None 

582 

583 class DummyMissingObjectFinder: 

584 def get_remote_has(self) -> None: 

585 return None 

586 

587 def __len__(self) -> int: 

588 return 0 

589 

590 def __iter__(self): 

591 yield from [] 

592 

593 return DummyMissingObjectFinder() # type: ignore 

594 

595 # If the graph walker is set up with an implementation that can 

596 # ACK/NAK to the wire, it will write data to the client through 

597 # this call as a side-effect. 

598 haves = self.object_store.find_common_revisions(graph_walker) 

599 

600 # Deal with shallow requests separately because the haves do 

601 # not reflect what objects are missing 

602 if getattr(graph_walker, "shallow", set()) or unshallow: 

603 # TODO: filter the haves commits from iter_shas. the specific 

604 # commits aren't missing. 

605 haves = [] 

606 

607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

608 

609 def get_parents(commit): 

610 return parents_provider.get_parents(commit.id, commit) 

611 

612 return MissingObjectFinder( 

613 self.object_store, 

614 haves=haves, 

615 wants=wants, 

616 shallow=getattr(graph_walker, "shallow", set()), 

617 progress=progress, 

618 get_tagged=get_tagged, 

619 get_parents=get_parents, 

620 ) 

621 

622 def generate_pack_data( 

623 self, 

624 have: list[ObjectID], 

625 want: list[ObjectID], 

626 progress: Optional[Callable[[str], None]] = None, 

627 ofs_delta: Optional[bool] = None, 

628 ): 

629 """Generate pack data objects for a set of wants/haves. 

630 

631 Args: 

632 have: List of SHA1s of objects that should not be sent 

633 want: List of SHA1s of objects that should be sent 

634 ofs_delta: Whether OFS deltas can be included 

635 progress: Optional progress reporting method 

636 """ 

637 return self.object_store.generate_pack_data( 

638 have, 

639 want, 

640 shallow=self.get_shallow(), 

641 progress=progress, 

642 ofs_delta=ofs_delta, 

643 ) 

644 

645 def get_graph_walker( 

646 self, heads: Optional[list[ObjectID]] = None 

647 ) -> ObjectStoreGraphWalker: 

648 """Retrieve a graph walker. 

649 

650 A graph walker is used by a remote repository (or proxy) 

651 to find out which objects are present in this repository. 

652 

653 Args: 

654 heads: Repository heads to use (optional) 

655 Returns: A graph walker object 

656 """ 

657 if heads is None: 

658 heads = [ 

659 sha 

660 for sha in self.refs.as_dict(b"refs/heads").values() 

661 if sha in self.object_store 

662 ] 

663 parents_provider = ParentsProvider(self.object_store) 

664 return ObjectStoreGraphWalker( 

665 heads, 

666 parents_provider.get_parents, 

667 shallow=self.get_shallow(), 

668 update_shallow=self.update_shallow, 

669 ) 

670 

671 def get_refs(self) -> dict[bytes, bytes]: 

672 """Get dictionary with all refs. 

673 

674 Returns: A ``dict`` mapping ref names to SHA1s 

675 """ 

676 return self.refs.as_dict() 

677 

678 def head(self) -> bytes: 

679 """Return the SHA1 pointed at by HEAD.""" 

680 # TODO: move this method to WorkTree 

681 return self.refs[b"HEAD"] 

682 

683 def _get_object(self, sha, cls): 

684 assert len(sha) in (20, 40) 

685 ret = self.get_object(sha) 

686 if not isinstance(ret, cls): 

687 if cls is Commit: 

688 raise NotCommitError(ret) 

689 elif cls is Blob: 

690 raise NotBlobError(ret) 

691 elif cls is Tree: 

692 raise NotTreeError(ret) 

693 elif cls is Tag: 

694 raise NotTagError(ret) 

695 else: 

696 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

697 return ret 

698 

699 def get_object(self, sha: bytes) -> ShaFile: 

700 """Retrieve the object with the specified SHA. 

701 

702 Args: 

703 sha: SHA to retrieve 

704 Returns: A ShaFile object 

705 Raises: 

706 KeyError: when the object can not be found 

707 """ 

708 return self.object_store[sha] 

709 

710 def parents_provider(self) -> ParentsProvider: 

711 return ParentsProvider( 

712 self.object_store, 

713 grafts=self._graftpoints, 

714 shallows=self.get_shallow(), 

715 ) 

716 

717 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

718 """Retrieve the parents of a specific commit. 

719 

720 If the specific commit is a graftpoint, the graft parents 

721 will be returned instead. 

722 

723 Args: 

724 sha: SHA of the commit for which to retrieve the parents 

725 commit: Optional commit matching the sha 

726 Returns: List of parents 

727 """ 

728 return self.parents_provider().get_parents(sha, commit) 

729 

730 def get_config(self) -> "ConfigFile": 

731 """Retrieve the config object. 

732 

733 Returns: `ConfigFile` object for the ``.git/config`` file. 

734 """ 

735 raise NotImplementedError(self.get_config) 

736 

737 def get_worktree_config(self) -> "ConfigFile": 

738 """Retrieve the worktree config object.""" 

739 raise NotImplementedError(self.get_worktree_config) 

740 

741 def get_description(self) -> Optional[str]: 

742 """Retrieve the description for this repository. 

743 

744 Returns: String with the description of the repository 

745 as set by the user. 

746 """ 

747 raise NotImplementedError(self.get_description) 

748 

749 def set_description(self, description) -> None: 

750 """Set the description for this repository. 

751 

752 Args: 

753 description: Text to set as description for this repository. 

754 """ 

755 raise NotImplementedError(self.set_description) 

756 

757 def get_rebase_state_manager(self): 

758 """Get the appropriate rebase state manager for this repository. 

759 

760 Returns: RebaseStateManager instance 

761 """ 

762 raise NotImplementedError(self.get_rebase_state_manager) 

763 

764 def get_blob_normalizer(self): 

765 """Return a BlobNormalizer object for checkin/checkout operations. 

766 

767 Returns: BlobNormalizer instance 

768 """ 

769 raise NotImplementedError(self.get_blob_normalizer) 

770 

771 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

772 """Read gitattributes for the repository. 

773 

774 Args: 

775 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

776 

777 Returns: 

778 GitAttributes object that can be used to match paths 

779 """ 

780 raise NotImplementedError(self.get_gitattributes) 

781 

782 def get_config_stack(self) -> "StackedConfig": 

783 """Return a config stack for this repository. 

784 

785 This stack accesses the configuration for both this repository 

786 itself (.git/config) and the global configuration, which usually 

787 lives in ~/.gitconfig. 

788 

789 Returns: `Config` instance for this repository 

790 """ 

791 from .config import ConfigFile, StackedConfig 

792 

793 local_config = self.get_config() 

794 backends: list[ConfigFile] = [local_config] 

795 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

796 backends.append(self.get_worktree_config()) 

797 

798 backends += StackedConfig.default_backends() 

799 return StackedConfig(backends, writable=local_config) 

800 

801 def get_shallow(self) -> set[ObjectID]: 

802 """Get the set of shallow commits. 

803 

804 Returns: Set of shallow commits. 

805 """ 

806 f = self.get_named_file("shallow") 

807 if f is None: 

808 return set() 

809 with f: 

810 return {line.strip() for line in f} 

811 

812 def update_shallow(self, new_shallow, new_unshallow) -> None: 

813 """Update the list of shallow objects. 

814 

815 Args: 

816 new_shallow: Newly shallow objects 

817 new_unshallow: Newly no longer shallow objects 

818 """ 

819 shallow = self.get_shallow() 

820 if new_shallow: 

821 shallow.update(new_shallow) 

822 if new_unshallow: 

823 shallow.difference_update(new_unshallow) 

824 if shallow: 

825 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

826 else: 

827 self._del_named_file("shallow") 

828 

829 def get_peeled(self, ref: Ref) -> ObjectID: 

830 """Get the peeled value of a ref. 

831 

832 Args: 

833 ref: The refname to peel. 

834 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

835 intermediate tags; if the original ref does not point to a tag, 

836 this will equal the original SHA1. 

837 """ 

838 cached = self.refs.get_peeled(ref) 

839 if cached is not None: 

840 return cached 

841 return peel_sha(self.object_store, self.refs[ref])[1].id 

842 

843 @property 

844 def notes(self) -> "Notes": 

845 """Access notes functionality for this repository. 

846 

847 Returns: 

848 Notes object for accessing notes 

849 """ 

850 from .notes import Notes 

851 

852 return Notes(self.object_store, self.refs) 

853 

854 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs): 

855 """Obtain a walker for this repository. 

856 

857 Args: 

858 include: Iterable of SHAs of commits to include along with their 

859 ancestors. Defaults to [HEAD] 

860 

861 Keyword Args: 

862 exclude: Iterable of SHAs of commits to exclude along with their 

863 ancestors, overriding includes. 

864 order: ORDER_* constant specifying the order of results. 

865 Anything other than ORDER_DATE may result in O(n) memory usage. 

866 reverse: If True, reverse the order of output, requiring O(n) 

867 memory. 

868 max_entries: The maximum number of entries to yield, or None for 

869 no limit. 

870 paths: Iterable of file or subtree paths to show entries for. 

871 rename_detector: diff.RenameDetector object for detecting 

872 renames. 

873 follow: If True, follow path across renames/copies. Forces a 

874 default rename_detector. 

875 since: Timestamp to list commits after. 

876 until: Timestamp to list commits before. 

877 queue_cls: A class to use for a queue of commits, supporting the 

878 iterator protocol. The constructor takes a single argument, the 

879 Walker. 

880 

881 Returns: A `Walker` object 

882 """ 

883 from .walk import Walker 

884 

885 if include is None: 

886 include = [self.head()] 

887 

888 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

889 

890 return Walker(self.object_store, include, **kwargs) 

891 

892 def __getitem__(self, name: Union[ObjectID, Ref]): 

893 """Retrieve a Git object by SHA1 or ref. 

894 

895 Args: 

896 name: A Git object SHA1 or a ref name 

897 Returns: A `ShaFile` object, such as a Commit or Blob 

898 Raises: 

899 KeyError: when the specified ref or object does not exist 

900 """ 

901 if not isinstance(name, bytes): 

902 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

903 if len(name) in (20, 40): 

904 try: 

905 return self.object_store[name] 

906 except (KeyError, ValueError): 

907 pass 

908 try: 

909 return self.object_store[self.refs[name]] 

910 except RefFormatError as exc: 

911 raise KeyError(name) from exc 

912 

913 def __contains__(self, name: bytes) -> bool: 

914 """Check if a specific Git object or ref is present. 

915 

916 Args: 

917 name: Git object SHA1 or ref name 

918 """ 

919 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

920 return name in self.object_store or name in self.refs 

921 else: 

922 return name in self.refs 

923 

924 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

925 """Set a ref. 

926 

927 Args: 

928 name: ref name 

929 value: Ref value - either a ShaFile object, or a hex sha 

930 """ 

931 if name.startswith(b"refs/") or name == b"HEAD": 

932 if isinstance(value, ShaFile): 

933 self.refs[name] = value.id 

934 elif isinstance(value, bytes): 

935 self.refs[name] = value 

936 else: 

937 raise TypeError(value) 

938 else: 

939 raise ValueError(name) 

940 

941 def __delitem__(self, name: bytes) -> None: 

942 """Remove a ref. 

943 

944 Args: 

945 name: Name of the ref to remove 

946 """ 

947 if name.startswith(b"refs/") or name == b"HEAD": 

948 del self.refs[name] 

949 else: 

950 raise ValueError(name) 

951 

952 def _get_user_identity( 

953 self, config: "StackedConfig", kind: Optional[str] = None 

954 ) -> bytes: 

955 """Determine the identity to use for new commits.""" 

956 warnings.warn( 

957 "use get_user_identity() rather than Repo._get_user_identity", 

958 DeprecationWarning, 

959 ) 

960 return get_user_identity(config) 

961 

962 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

963 """Add or modify graftpoints. 

964 

965 Args: 

966 updated_graftpoints: Dict of commit shas to list of parent shas 

967 """ 

968 # Simple validation 

969 for commit, parents in updated_graftpoints.items(): 

970 for sha in [commit, *parents]: 

971 check_hexsha(sha, "Invalid graftpoint") 

972 

973 self._graftpoints.update(updated_graftpoints) 

974 

975 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

976 """Remove graftpoints. 

977 

978 Args: 

979 to_remove: List of commit shas 

980 """ 

981 for sha in to_remove: 

982 del self._graftpoints[sha] 

983 

984 def _read_heads(self, name): 

985 f = self.get_named_file(name) 

986 if f is None: 

987 return [] 

988 with f: 

989 return [line.strip() for line in f.readlines() if line.strip()] 

990 

991 def get_worktree(self) -> "WorkTree": 

992 """Get the working tree for this repository. 

993 

994 Returns: 

995 WorkTree instance for performing working tree operations 

996 

997 Raises: 

998 NotImplementedError: If the repository doesn't support working trees 

999 """ 

1000 raise NotImplementedError( 

1001 "Working tree operations not supported by this repository type" 

1002 ) 

1003 

1004 @replace_me(remove_in="0.26.0") 

1005 def do_commit( 

1006 self, 

1007 message: Optional[bytes] = None, 

1008 committer: Optional[bytes] = None, 

1009 author: Optional[bytes] = None, 

1010 commit_timestamp=None, 

1011 commit_timezone=None, 

1012 author_timestamp=None, 

1013 author_timezone=None, 

1014 tree: Optional[ObjectID] = None, 

1015 encoding: Optional[bytes] = None, 

1016 ref: Optional[Ref] = b"HEAD", 

1017 merge_heads: Optional[list[ObjectID]] = None, 

1018 no_verify: bool = False, 

1019 sign: bool = False, 

1020 ): 

1021 """Create a new commit. 

1022 

1023 If not specified, committer and author default to 

1024 get_user_identity(..., 'COMMITTER') 

1025 and get_user_identity(..., 'AUTHOR') respectively. 

1026 

1027 Args: 

1028 message: Commit message (bytes or callable that takes (repo, commit) 

1029 and returns bytes) 

1030 committer: Committer fullname 

1031 author: Author fullname 

1032 commit_timestamp: Commit timestamp (defaults to now) 

1033 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1034 author_timestamp: Author timestamp (defaults to commit 

1035 timestamp) 

1036 author_timezone: Author timestamp timezone 

1037 (defaults to commit timestamp timezone) 

1038 tree: SHA1 of the tree root to use (if not specified the 

1039 current index will be committed). 

1040 encoding: Encoding 

1041 ref: Optional ref to commit to (defaults to current branch). 

1042 If None, creates a dangling commit without updating any ref. 

1043 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1044 no_verify: Skip pre-commit and commit-msg hooks 

1045 sign: GPG Sign the commit (bool, defaults to False, 

1046 pass True to use default GPG key, 

1047 pass a str containing Key ID to use a specific GPG key) 

1048 

1049 Returns: 

1050 New commit SHA1 

1051 """ 

1052 return self.get_worktree().commit( 

1053 message=message, 

1054 committer=committer, 

1055 author=author, 

1056 commit_timestamp=commit_timestamp, 

1057 commit_timezone=commit_timezone, 

1058 author_timestamp=author_timestamp, 

1059 author_timezone=author_timezone, 

1060 tree=tree, 

1061 encoding=encoding, 

1062 ref=ref, 

1063 merge_heads=merge_heads, 

1064 no_verify=no_verify, 

1065 sign=sign, 

1066 ) 

1067 

1068 

1069def read_gitfile(f): 

1070 """Read a ``.git`` file. 

1071 

1072 The first line of the file should start with "gitdir: " 

1073 

1074 Args: 

1075 f: File-like object to read from 

1076 Returns: A path 

1077 """ 

1078 cs = f.read() 

1079 if not cs.startswith("gitdir: "): 

1080 raise ValueError("Expected file to start with 'gitdir: '") 

1081 return cs[len("gitdir: ") :].rstrip("\n") 

1082 

1083 

1084class UnsupportedVersion(Exception): 

1085 """Unsupported repository version.""" 

1086 

1087 def __init__(self, version) -> None: 

1088 self.version = version 

1089 

1090 

1091class UnsupportedExtension(Exception): 

1092 """Unsupported repository extension.""" 

1093 

1094 def __init__(self, extension) -> None: 

1095 self.extension = extension 

1096 

1097 

1098class Repo(BaseRepo): 

1099 """A git repository backed by local disk. 

1100 

1101 To open an existing repository, call the constructor with 

1102 the path of the repository. 

1103 

1104 To create a new repository, use the Repo.init class method. 

1105 

1106 Note that a repository object may hold on to resources such 

1107 as file handles for performance reasons; call .close() to free 

1108 up those resources. 

1109 

1110 Attributes: 

1111 path: Path to the working copy (if it exists) or repository control 

1112 directory (if the repository is bare) 

1113 bare: Whether this is a bare repository 

1114 """ 

1115 

1116 path: str 

1117 bare: bool 

1118 object_store: DiskObjectStore 

1119 

1120 def __init__( 

1121 self, 

1122 root: Union[str, bytes, os.PathLike], 

1123 object_store: Optional[PackBasedObjectStore] = None, 

1124 bare: Optional[bool] = None, 

1125 ) -> None: 

1126 """Open a repository on disk. 

1127 

1128 Args: 

1129 root: Path to the repository's root. 

1130 object_store: ObjectStore to use; if omitted, we use the 

1131 repository's default object store 

1132 bare: True if this is a bare repository. 

1133 """ 

1134 root = os.fspath(root) 

1135 if isinstance(root, bytes): 

1136 root = os.fsdecode(root) 

1137 hidden_path = os.path.join(root, CONTROLDIR) 

1138 if bare is None: 

1139 if os.path.isfile(hidden_path) or os.path.isdir( 

1140 os.path.join(hidden_path, OBJECTDIR) 

1141 ): 

1142 bare = False 

1143 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1144 os.path.join(root, REFSDIR) 

1145 ): 

1146 bare = True 

1147 else: 

1148 raise NotGitRepository( 

1149 "No git repository was found at {path}".format(**dict(path=root)) 

1150 ) 

1151 

1152 self.bare = bare 

1153 if bare is False: 

1154 if os.path.isfile(hidden_path): 

1155 with open(hidden_path) as f: 

1156 path = read_gitfile(f) 

1157 self._controldir = os.path.join(root, path) 

1158 else: 

1159 self._controldir = hidden_path 

1160 else: 

1161 self._controldir = root 

1162 commondir = self.get_named_file(COMMONDIR) 

1163 if commondir is not None: 

1164 with commondir: 

1165 self._commondir = os.path.join( 

1166 self.controldir(), 

1167 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1168 ) 

1169 else: 

1170 self._commondir = self._controldir 

1171 self.path = root 

1172 

1173 # Initialize refs early so they're available for config condition matchers 

1174 self.refs = DiskRefsContainer( 

1175 self.commondir(), self._controldir, logger=self._write_reflog 

1176 ) 

1177 

1178 # Initialize worktrees container 

1179 from .worktree import WorkTreeContainer 

1180 

1181 self.worktrees = WorkTreeContainer(self) 

1182 

1183 config = self.get_config() 

1184 try: 

1185 repository_format_version = config.get("core", "repositoryformatversion") 

1186 format_version = ( 

1187 0 

1188 if repository_format_version is None 

1189 else int(repository_format_version) 

1190 ) 

1191 except KeyError: 

1192 format_version = 0 

1193 

1194 if format_version not in (0, 1): 

1195 raise UnsupportedVersion(format_version) 

1196 

1197 # Track extensions we encounter 

1198 has_reftable_extension = False 

1199 for extension, value in config.items((b"extensions",)): 

1200 if extension.lower() == b"refstorage": 

1201 if value == b"reftable": 

1202 has_reftable_extension = True 

1203 else: 

1204 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1205 elif extension.lower() not in (b"worktreeconfig",): 

1206 raise UnsupportedExtension(extension) 

1207 

1208 if object_store is None: 

1209 object_store = DiskObjectStore.from_config( 

1210 os.path.join(self.commondir(), OBJECTDIR), config 

1211 ) 

1212 

1213 # Use reftable if extension is configured 

1214 if has_reftable_extension: 

1215 from .reftable import ReftableRefsContainer 

1216 

1217 self.refs = ReftableRefsContainer(self.commondir()) 

1218 # Update worktrees container after refs change 

1219 self.worktrees = WorkTreeContainer(self) 

1220 BaseRepo.__init__(self, object_store, self.refs) 

1221 

1222 self._graftpoints = {} 

1223 graft_file = self.get_named_file( 

1224 os.path.join("info", "grafts"), basedir=self.commondir() 

1225 ) 

1226 if graft_file: 

1227 with graft_file: 

1228 self._graftpoints.update(parse_graftpoints(graft_file)) 

1229 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1230 if graft_file: 

1231 with graft_file: 

1232 self._graftpoints.update(parse_graftpoints(graft_file)) 

1233 

1234 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1235 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1236 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1237 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1238 

1239 def get_worktree(self) -> "WorkTree": 

1240 """Get the working tree for this repository. 

1241 

1242 Returns: 

1243 WorkTree instance for performing working tree operations 

1244 """ 

1245 from .worktree import WorkTree 

1246 

1247 return WorkTree(self, self.path) 

1248 

1249 def _write_reflog( 

1250 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1251 ) -> None: 

1252 from .reflog import format_reflog_line 

1253 

1254 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1255 try: 

1256 os.makedirs(os.path.dirname(path)) 

1257 except FileExistsError: 

1258 pass 

1259 if committer is None: 

1260 config = self.get_config_stack() 

1261 committer = get_user_identity(config) 

1262 check_user_identity(committer) 

1263 if timestamp is None: 

1264 timestamp = int(time.time()) 

1265 if timezone is None: 

1266 timezone = 0 # FIXME 

1267 with open(path, "ab") as f: 

1268 f.write( 

1269 format_reflog_line( 

1270 old_sha, new_sha, committer, timestamp, timezone, message 

1271 ) 

1272 + b"\n" 

1273 ) 

1274 

1275 def read_reflog(self, ref): 

1276 """Read reflog entries for a reference. 

1277 

1278 Args: 

1279 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1280 

1281 Yields: 

1282 reflog.Entry objects in chronological order (oldest first) 

1283 """ 

1284 from .reflog import read_reflog 

1285 

1286 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1287 try: 

1288 with open(path, "rb") as f: 

1289 yield from read_reflog(f) 

1290 except FileNotFoundError: 

1291 return 

1292 

1293 @classmethod 

1294 def discover(cls, start="."): 

1295 """Iterate parent directories to discover a repository. 

1296 

1297 Return a Repo object for the first parent directory that looks like a 

1298 Git repository. 

1299 

1300 Args: 

1301 start: The directory to start discovery from (defaults to '.') 

1302 """ 

1303 remaining = True 

1304 path = os.path.abspath(start) 

1305 while remaining: 

1306 try: 

1307 return cls(path) 

1308 except NotGitRepository: 

1309 path, remaining = os.path.split(path) 

1310 raise NotGitRepository( 

1311 "No git repository was found at {path}".format(**dict(path=start)) 

1312 ) 

1313 

1314 def controldir(self): 

1315 """Return the path of the control directory.""" 

1316 return self._controldir 

1317 

1318 def commondir(self): 

1319 """Return the path of the common directory. 

1320 

1321 For a main working tree, it is identical to controldir(). 

1322 

1323 For a linked working tree, it is the control directory of the 

1324 main working tree. 

1325 """ 

1326 return self._commondir 

1327 

1328 def _determine_file_mode(self): 

1329 """Probe the file-system to determine whether permissions can be trusted. 

1330 

1331 Returns: True if permissions can be trusted, False otherwise. 

1332 """ 

1333 fname = os.path.join(self.path, ".probe-permissions") 

1334 with open(fname, "w") as f: 

1335 f.write("") 

1336 

1337 st1 = os.lstat(fname) 

1338 try: 

1339 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1340 except PermissionError: 

1341 return False 

1342 st2 = os.lstat(fname) 

1343 

1344 os.unlink(fname) 

1345 

1346 mode_differs = st1.st_mode != st2.st_mode 

1347 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1348 

1349 return mode_differs and st2_has_exec 

1350 

1351 def _determine_symlinks(self): 

1352 """Probe the filesystem to determine whether symlinks can be created. 

1353 

1354 Returns: True if symlinks can be created, False otherwise. 

1355 """ 

1356 # TODO(jelmer): Actually probe disk / look at filesystem 

1357 return sys.platform != "win32" 

1358 

1359 def _put_named_file(self, path, contents) -> None: 

1360 """Write a file to the control dir with the given name and contents. 

1361 

1362 Args: 

1363 path: The path to the file, relative to the control dir. 

1364 contents: A string to write to the file. 

1365 """ 

1366 path = path.lstrip(os.path.sep) 

1367 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1368 f.write(contents) 

1369 

1370 def _del_named_file(self, path) -> None: 

1371 try: 

1372 os.unlink(os.path.join(self.controldir(), path)) 

1373 except FileNotFoundError: 

1374 return 

1375 

1376 def get_named_file(self, path, basedir=None): 

1377 """Get a file from the control dir with a specific name. 

1378 

1379 Although the filename should be interpreted as a filename relative to 

1380 the control dir in a disk-based Repo, the object returned need not be 

1381 pointing to a file in that location. 

1382 

1383 Args: 

1384 path: The path to the file, relative to the control dir. 

1385 basedir: Optional argument that specifies an alternative to the 

1386 control dir. 

1387 Returns: An open file object, or None if the file does not exist. 

1388 """ 

1389 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1390 # the dumb web serving code. 

1391 if basedir is None: 

1392 basedir = self.controldir() 

1393 path = path.lstrip(os.path.sep) 

1394 try: 

1395 return open(os.path.join(basedir, path), "rb") 

1396 except FileNotFoundError: 

1397 return None 

1398 

1399 def index_path(self): 

1400 """Return path to the index file.""" 

1401 return os.path.join(self.controldir(), INDEX_FILENAME) 

1402 

1403 def open_index(self) -> "Index": 

1404 """Open the index for this repository. 

1405 

1406 Raises: 

1407 NoIndexPresent: If no index is present 

1408 Returns: The matching `Index` 

1409 """ 

1410 from .index import Index 

1411 

1412 if not self.has_index(): 

1413 raise NoIndexPresent 

1414 

1415 # Check for manyFiles feature configuration 

1416 config = self.get_config_stack() 

1417 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1418 skip_hash = False 

1419 index_version = None 

1420 

1421 if many_files: 

1422 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1423 try: 

1424 index_version_str = config.get(b"index", b"version") 

1425 index_version = int(index_version_str) 

1426 except KeyError: 

1427 index_version = 4 # Default to version 4 for manyFiles 

1428 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1429 else: 

1430 # Check for explicit index settings 

1431 try: 

1432 index_version_str = config.get(b"index", b"version") 

1433 index_version = int(index_version_str) 

1434 except KeyError: 

1435 index_version = None 

1436 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1437 

1438 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1439 

1440 def has_index(self) -> bool: 

1441 """Check if an index is present.""" 

1442 # Bare repos must never have index files; non-bare repos may have a 

1443 # missing index file, which is treated as empty. 

1444 return not self.bare 

1445 

1446 @replace_me(remove_in="0.26.0") 

1447 def stage( 

1448 self, 

1449 fs_paths: Union[ 

1450 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1451 ], 

1452 ) -> None: 

1453 """Stage a set of paths. 

1454 

1455 Args: 

1456 fs_paths: List of paths, relative to the repository path 

1457 """ 

1458 return self.get_worktree().stage(fs_paths) 

1459 

1460 @replace_me(remove_in="0.26.0") 

1461 def unstage(self, fs_paths: list[str]) -> None: 

1462 """Unstage specific file in the index 

1463 Args: 

1464 fs_paths: a list of files to unstage, 

1465 relative to the repository path. 

1466 """ 

1467 return self.get_worktree().unstage(fs_paths) 

1468 

1469 def clone( 

1470 self, 

1471 target_path, 

1472 *, 

1473 mkdir=True, 

1474 bare=False, 

1475 origin=b"origin", 

1476 checkout=None, 

1477 branch=None, 

1478 progress=None, 

1479 depth: Optional[int] = None, 

1480 symlinks=None, 

1481 ) -> "Repo": 

1482 """Clone this repository. 

1483 

1484 Args: 

1485 target_path: Target path 

1486 mkdir: Create the target directory 

1487 bare: Whether to create a bare repository 

1488 checkout: Whether or not to check-out HEAD after cloning 

1489 origin: Base name for refs in target repository 

1490 cloned from this repository 

1491 branch: Optional branch or tag to be used as HEAD in the new repository 

1492 instead of this repository's HEAD. 

1493 progress: Optional progress function 

1494 depth: Depth at which to fetch 

1495 symlinks: Symlinks setting (default to autodetect) 

1496 Returns: Created repository as `Repo` 

1497 """ 

1498 encoded_path = os.fsencode(self.path) 

1499 

1500 if mkdir: 

1501 os.mkdir(target_path) 

1502 

1503 try: 

1504 if not bare: 

1505 target = Repo.init(target_path, symlinks=symlinks) 

1506 if checkout is None: 

1507 checkout = True 

1508 else: 

1509 if checkout: 

1510 raise ValueError("checkout and bare are incompatible") 

1511 target = Repo.init_bare(target_path) 

1512 

1513 try: 

1514 target_config = target.get_config() 

1515 target_config.set((b"remote", origin), b"url", encoded_path) 

1516 target_config.set( 

1517 (b"remote", origin), 

1518 b"fetch", 

1519 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1520 ) 

1521 target_config.write_to_path() 

1522 

1523 ref_message = b"clone: from " + encoded_path 

1524 self.fetch(target, depth=depth) 

1525 target.refs.import_refs( 

1526 b"refs/remotes/" + origin, 

1527 self.refs.as_dict(b"refs/heads"), 

1528 message=ref_message, 

1529 ) 

1530 target.refs.import_refs( 

1531 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1532 ) 

1533 

1534 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1535 origin_head = head_chain[-1] if head_chain else None 

1536 if origin_sha and not origin_head: 

1537 # set detached HEAD 

1538 target.refs[b"HEAD"] = origin_sha 

1539 else: 

1540 _set_origin_head(target.refs, origin, origin_head) 

1541 head_ref = _set_default_branch( 

1542 target.refs, origin, origin_head, branch, ref_message 

1543 ) 

1544 

1545 # Update target head 

1546 if head_ref: 

1547 head = _set_head(target.refs, head_ref, ref_message) 

1548 else: 

1549 head = None 

1550 

1551 if checkout and head is not None: 

1552 target.get_worktree().reset_index() 

1553 except BaseException: 

1554 target.close() 

1555 raise 

1556 except BaseException: 

1557 if mkdir: 

1558 import shutil 

1559 

1560 shutil.rmtree(target_path) 

1561 raise 

1562 return target 

1563 

1564 @replace_me(remove_in="0.26.0") 

1565 def reset_index(self, tree: Optional[bytes] = None): 

1566 """Reset the index back to a specific tree. 

1567 

1568 Args: 

1569 tree: Tree SHA to reset to, None for current HEAD tree. 

1570 """ 

1571 return self.get_worktree().reset_index(tree) 

1572 

1573 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1574 """Get condition matchers for includeIf conditions. 

1575 

1576 Returns a dict of condition prefix to matcher function. 

1577 """ 

1578 from pathlib import Path 

1579 

1580 from .config import ConditionMatcher, match_glob_pattern 

1581 

1582 # Add gitdir matchers 

1583 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1584 # Handle relative patterns (starting with ./) 

1585 if pattern.startswith("./"): 

1586 # Can't handle relative patterns without config directory context 

1587 return False 

1588 

1589 # Normalize repository path 

1590 try: 

1591 repo_path = str(Path(self._controldir).resolve()) 

1592 except (OSError, ValueError): 

1593 return False 

1594 

1595 # Expand ~ in pattern and normalize 

1596 pattern = os.path.expanduser(pattern) 

1597 

1598 # Normalize pattern following Git's rules 

1599 pattern = pattern.replace("\\", "/") 

1600 if not pattern.startswith(("~/", "./", "/", "**")): 

1601 # Check for Windows absolute path 

1602 if len(pattern) >= 2 and pattern[1] == ":": 

1603 pass 

1604 else: 

1605 pattern = "**/" + pattern 

1606 if pattern.endswith("/"): 

1607 pattern = pattern + "**" 

1608 

1609 # Use the existing _match_gitdir_pattern function 

1610 from .config import _match_gitdir_pattern 

1611 

1612 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1613 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1614 

1615 return _match_gitdir_pattern( 

1616 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1617 ) 

1618 

1619 # Add onbranch matcher 

1620 def match_onbranch(pattern: str) -> bool: 

1621 try: 

1622 # Get the current branch using refs 

1623 ref_chain, _ = self.refs.follow(b"HEAD") 

1624 head_ref = ref_chain[-1] # Get the final resolved ref 

1625 except KeyError: 

1626 pass 

1627 else: 

1628 if head_ref and head_ref.startswith(b"refs/heads/"): 

1629 # Extract branch name from ref 

1630 branch = head_ref[11:].decode("utf-8", errors="replace") 

1631 return match_glob_pattern(branch, pattern) 

1632 return False 

1633 

1634 matchers: dict[str, ConditionMatcher] = { 

1635 "onbranch:": match_onbranch, 

1636 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1637 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1638 } 

1639 

1640 return matchers 

1641 

1642 def get_worktree_config(self) -> "ConfigFile": 

1643 from .config import ConfigFile 

1644 

1645 path = os.path.join(self.commondir(), "config.worktree") 

1646 try: 

1647 # Pass condition matchers for includeIf evaluation 

1648 condition_matchers = self._get_config_condition_matchers() 

1649 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1650 except FileNotFoundError: 

1651 cf = ConfigFile() 

1652 cf.path = path 

1653 return cf 

1654 

1655 def get_config(self) -> "ConfigFile": 

1656 """Retrieve the config object. 

1657 

1658 Returns: `ConfigFile` object for the ``.git/config`` file. 

1659 """ 

1660 from .config import ConfigFile 

1661 

1662 path = os.path.join(self._commondir, "config") 

1663 try: 

1664 # Pass condition matchers for includeIf evaluation 

1665 condition_matchers = self._get_config_condition_matchers() 

1666 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1667 except FileNotFoundError: 

1668 ret = ConfigFile() 

1669 ret.path = path 

1670 return ret 

1671 

1672 def get_rebase_state_manager(self): 

1673 """Get the appropriate rebase state manager for this repository. 

1674 

1675 Returns: DiskRebaseStateManager instance 

1676 """ 

1677 import os 

1678 

1679 from .rebase import DiskRebaseStateManager 

1680 

1681 path = os.path.join(self.controldir(), "rebase-merge") 

1682 return DiskRebaseStateManager(path) 

1683 

1684 def get_description(self): 

1685 """Retrieve the description of this repository. 

1686 

1687 Returns: A string describing the repository or None. 

1688 """ 

1689 path = os.path.join(self._controldir, "description") 

1690 try: 

1691 with GitFile(path, "rb") as f: 

1692 return f.read() 

1693 except FileNotFoundError: 

1694 return None 

1695 

1696 def __repr__(self) -> str: 

1697 return f"<Repo at {self.path!r}>" 

1698 

1699 def set_description(self, description) -> None: 

1700 """Set the description for this repository. 

1701 

1702 Args: 

1703 description: Text to set as description for this repository. 

1704 """ 

1705 self._put_named_file("description", description) 

1706 

1707 @classmethod 

1708 def _init_maybe_bare( 

1709 cls, 

1710 path: Union[str, bytes, os.PathLike], 

1711 controldir: Union[str, bytes, os.PathLike], 

1712 bare, 

1713 object_store=None, 

1714 config=None, 

1715 default_branch=None, 

1716 symlinks: Optional[bool] = None, 

1717 format: Optional[int] = None, 

1718 ): 

1719 path = os.fspath(path) 

1720 if isinstance(path, bytes): 

1721 path = os.fsdecode(path) 

1722 controldir = os.fspath(controldir) 

1723 if isinstance(controldir, bytes): 

1724 controldir = os.fsdecode(controldir) 

1725 for d in BASE_DIRECTORIES: 

1726 os.mkdir(os.path.join(controldir, *d)) 

1727 if object_store is None: 

1728 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1729 ret = cls(path, bare=bare, object_store=object_store) 

1730 if default_branch is None: 

1731 if config is None: 

1732 from .config import StackedConfig 

1733 

1734 config = StackedConfig.default() 

1735 try: 

1736 default_branch = config.get("init", "defaultBranch") 

1737 except KeyError: 

1738 default_branch = DEFAULT_BRANCH 

1739 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1740 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1741 return ret 

1742 

1743 @classmethod 

1744 def init( 

1745 cls, 

1746 path: Union[str, bytes, os.PathLike], 

1747 *, 

1748 mkdir: bool = False, 

1749 config=None, 

1750 default_branch=None, 

1751 symlinks: Optional[bool] = None, 

1752 format: Optional[int] = None, 

1753 ) -> "Repo": 

1754 """Create a new repository. 

1755 

1756 Args: 

1757 path: Path in which to create the repository 

1758 mkdir: Whether to create the directory 

1759 format: Repository format version (defaults to 0) 

1760 Returns: `Repo` instance 

1761 """ 

1762 path = os.fspath(path) 

1763 if isinstance(path, bytes): 

1764 path = os.fsdecode(path) 

1765 if mkdir: 

1766 os.mkdir(path) 

1767 controldir = os.path.join(path, CONTROLDIR) 

1768 os.mkdir(controldir) 

1769 _set_filesystem_hidden(controldir) 

1770 return cls._init_maybe_bare( 

1771 path, 

1772 controldir, 

1773 False, 

1774 config=config, 

1775 default_branch=default_branch, 

1776 symlinks=symlinks, 

1777 format=format, 

1778 ) 

1779 

1780 @classmethod 

1781 def _init_new_working_directory( 

1782 cls, 

1783 path: Union[str, bytes, os.PathLike], 

1784 main_repo, 

1785 identifier=None, 

1786 mkdir=False, 

1787 ): 

1788 """Create a new working directory linked to a repository. 

1789 

1790 Args: 

1791 path: Path in which to create the working tree. 

1792 main_repo: Main repository to reference 

1793 identifier: Worktree identifier 

1794 mkdir: Whether to create the directory 

1795 Returns: `Repo` instance 

1796 """ 

1797 path = os.fspath(path) 

1798 if isinstance(path, bytes): 

1799 path = os.fsdecode(path) 

1800 if mkdir: 

1801 os.mkdir(path) 

1802 if identifier is None: 

1803 identifier = os.path.basename(path) 

1804 # Ensure we use absolute path for the worktree control directory 

1805 main_controldir = os.path.abspath(main_repo.controldir()) 

1806 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1807 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1808 gitdirfile = os.path.join(path, CONTROLDIR) 

1809 with open(gitdirfile, "wb") as f: 

1810 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1811 try: 

1812 os.mkdir(main_worktreesdir) 

1813 except FileExistsError: 

1814 pass 

1815 try: 

1816 os.mkdir(worktree_controldir) 

1817 except FileExistsError: 

1818 pass 

1819 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1820 f.write(os.fsencode(gitdirfile) + b"\n") 

1821 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1822 f.write(b"../..\n") 

1823 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1824 f.write(main_repo.head() + b"\n") 

1825 r = cls(os.path.normpath(path)) 

1826 r.get_worktree().reset_index() 

1827 return r 

1828 

1829 @classmethod 

1830 def init_bare( 

1831 cls, 

1832 path: Union[str, bytes, os.PathLike], 

1833 *, 

1834 mkdir=False, 

1835 object_store=None, 

1836 config=None, 

1837 default_branch=None, 

1838 format: Optional[int] = None, 

1839 ): 

1840 """Create a new bare repository. 

1841 

1842 ``path`` should already exist and be an empty directory. 

1843 

1844 Args: 

1845 path: Path to create bare repository in 

1846 format: Repository format version (defaults to 0) 

1847 Returns: a `Repo` instance 

1848 """ 

1849 path = os.fspath(path) 

1850 if isinstance(path, bytes): 

1851 path = os.fsdecode(path) 

1852 if mkdir: 

1853 os.mkdir(path) 

1854 return cls._init_maybe_bare( 

1855 path, 

1856 path, 

1857 True, 

1858 object_store=object_store, 

1859 config=config, 

1860 default_branch=default_branch, 

1861 format=format, 

1862 ) 

1863 

1864 create = init_bare 

1865 

1866 def close(self) -> None: 

1867 """Close any files opened by this repository.""" 

1868 self.object_store.close() 

1869 

1870 def __enter__(self): 

1871 return self 

1872 

1873 def __exit__(self, exc_type, exc_val, exc_tb): 

1874 self.close() 

1875 

1876 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

1877 """Read .gitattributes file from working tree. 

1878 

1879 Returns: 

1880 Dictionary mapping file patterns to attributes 

1881 """ 

1882 gitattributes = {} 

1883 gitattributes_path = os.path.join(self.path, ".gitattributes") 

1884 

1885 if os.path.exists(gitattributes_path): 

1886 with open(gitattributes_path, "rb") as f: 

1887 for line in f: 

1888 line = line.strip() 

1889 if not line or line.startswith(b"#"): 

1890 continue 

1891 

1892 parts = line.split() 

1893 if len(parts) < 2: 

1894 continue 

1895 

1896 pattern = parts[0] 

1897 attrs = {} 

1898 

1899 for attr in parts[1:]: 

1900 if attr.startswith(b"-"): 

1901 # Unset attribute 

1902 attrs[attr[1:]] = b"false" 

1903 elif b"=" in attr: 

1904 # Set to value 

1905 key, value = attr.split(b"=", 1) 

1906 attrs[key] = value 

1907 else: 

1908 # Set attribute 

1909 attrs[attr] = b"true" 

1910 

1911 gitattributes[pattern] = attrs 

1912 

1913 return gitattributes 

1914 

1915 def get_blob_normalizer(self): 

1916 """Return a BlobNormalizer object.""" 

1917 from .filters import FilterBlobNormalizer, FilterRegistry 

1918 

1919 # Get proper GitAttributes object 

1920 git_attributes = self.get_gitattributes() 

1921 config_stack = self.get_config_stack() 

1922 

1923 # Create FilterRegistry with repo reference 

1924 filter_registry = FilterRegistry(config_stack, self) 

1925 

1926 # Return FilterBlobNormalizer which handles all filters including line endings 

1927 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

1928 

1929 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

1930 """Read gitattributes for the repository. 

1931 

1932 Args: 

1933 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

1934 

1935 Returns: 

1936 GitAttributes object that can be used to match paths 

1937 """ 

1938 from .attrs import ( 

1939 GitAttributes, 

1940 Pattern, 

1941 parse_git_attributes, 

1942 ) 

1943 

1944 patterns = [] 

1945 

1946 # Read system gitattributes (TODO: implement this) 

1947 # Read global gitattributes (TODO: implement this) 

1948 

1949 # Read repository .gitattributes from index/tree 

1950 if tree is None: 

1951 try: 

1952 # Try to get from HEAD 

1953 head = self[b"HEAD"] 

1954 if isinstance(head, Tag): 

1955 _cls, obj = head.object 

1956 head = self.get_object(obj) 

1957 tree = head.tree 

1958 except KeyError: 

1959 # No HEAD, no attributes from tree 

1960 pass 

1961 

1962 if tree is not None: 

1963 try: 

1964 tree_obj = self[tree] 

1965 if b".gitattributes" in tree_obj: 

1966 _, attrs_sha = tree_obj[b".gitattributes"] 

1967 attrs_blob = self[attrs_sha] 

1968 if isinstance(attrs_blob, Blob): 

1969 attrs_data = BytesIO(attrs_blob.data) 

1970 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

1971 pattern = Pattern(pattern_bytes) 

1972 patterns.append((pattern, attrs)) 

1973 except (KeyError, NotTreeError): 

1974 pass 

1975 

1976 # Read .git/info/attributes 

1977 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

1978 if os.path.exists(info_attrs_path): 

1979 with open(info_attrs_path, "rb") as f: 

1980 for pattern_bytes, attrs in parse_git_attributes(f): 

1981 pattern = Pattern(pattern_bytes) 

1982 patterns.append((pattern, attrs)) 

1983 

1984 # Read .gitattributes from working directory (if it exists) 

1985 working_attrs_path = os.path.join(self.path, ".gitattributes") 

1986 if os.path.exists(working_attrs_path): 

1987 with open(working_attrs_path, "rb") as f: 

1988 for pattern_bytes, attrs in parse_git_attributes(f): 

1989 pattern = Pattern(pattern_bytes) 

1990 patterns.append((pattern, attrs)) 

1991 

1992 return GitAttributes(patterns) 

1993 

1994 @replace_me(remove_in="0.26.0") 

1995 def _sparse_checkout_file_path(self) -> str: 

1996 """Return the path of the sparse-checkout file in this repo's control dir.""" 

1997 return self.get_worktree()._sparse_checkout_file_path() 

1998 

1999 @replace_me(remove_in="0.26.0") 

2000 def configure_for_cone_mode(self) -> None: 

2001 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2002 return self.get_worktree().configure_for_cone_mode() 

2003 

2004 @replace_me(remove_in="0.26.0") 

2005 def infer_cone_mode(self) -> bool: 

2006 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2007 return self.get_worktree().infer_cone_mode() 

2008 

2009 @replace_me(remove_in="0.26.0") 

2010 def get_sparse_checkout_patterns(self) -> list[str]: 

2011 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2012 

2013 Returns: 

2014 A list of patterns. Returns an empty list if the file is missing. 

2015 """ 

2016 return self.get_worktree().get_sparse_checkout_patterns() 

2017 

2018 @replace_me(remove_in="0.26.0") 

2019 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2020 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2021 

2022 Creates the info/ directory if it does not exist. 

2023 

2024 Args: 

2025 patterns: A list of gitignore-style patterns to store. 

2026 """ 

2027 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2028 

2029 @replace_me(remove_in="0.26.0") 

2030 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2031 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2032 

2033 For each directory to include, add an inclusion line that "undoes" the prior 

2034 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2035 Never add the same line twice. 

2036 """ 

2037 return self.get_worktree().set_cone_mode_patterns(dirs) 

2038 

2039 

2040class MemoryRepo(BaseRepo): 

2041 """Repo that stores refs, objects, and named files in memory. 

2042 

2043 MemoryRepos are always bare: they have no working tree and no index, since 

2044 those have a stronger dependency on the filesystem. 

2045 """ 

2046 

2047 def __init__(self) -> None: 

2048 """Create a new repository in memory.""" 

2049 from .config import ConfigFile 

2050 

2051 self._reflog: list[Any] = [] 

2052 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2053 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore 

2054 self._named_files: dict[str, bytes] = {} 

2055 self.bare = True 

2056 self._config = ConfigFile() 

2057 self._description = None 

2058 

2059 def _append_reflog(self, *args) -> None: 

2060 self._reflog.append(args) 

2061 

2062 def set_description(self, description) -> None: 

2063 self._description = description 

2064 

2065 def get_description(self): 

2066 return self._description 

2067 

2068 def _determine_file_mode(self): 

2069 """Probe the file-system to determine whether permissions can be trusted. 

2070 

2071 Returns: True if permissions can be trusted, False otherwise. 

2072 """ 

2073 return sys.platform != "win32" 

2074 

2075 def _determine_symlinks(self): 

2076 """Probe the file-system to determine whether permissions can be trusted. 

2077 

2078 Returns: True if permissions can be trusted, False otherwise. 

2079 """ 

2080 return sys.platform != "win32" 

2081 

2082 def _put_named_file(self, path, contents) -> None: 

2083 """Write a file to the control dir with the given name and contents. 

2084 

2085 Args: 

2086 path: The path to the file, relative to the control dir. 

2087 contents: A string to write to the file. 

2088 """ 

2089 self._named_files[path] = contents 

2090 

2091 def _del_named_file(self, path) -> None: 

2092 try: 

2093 del self._named_files[path] 

2094 except KeyError: 

2095 pass 

2096 

2097 def get_named_file(self, path, basedir=None): 

2098 """Get a file from the control dir with a specific name. 

2099 

2100 Although the filename should be interpreted as a filename relative to 

2101 the control dir in a disk-baked Repo, the object returned need not be 

2102 pointing to a file in that location. 

2103 

2104 Args: 

2105 path: The path to the file, relative to the control dir. 

2106 Returns: An open file object, or None if the file does not exist. 

2107 """ 

2108 contents = self._named_files.get(path, None) 

2109 if contents is None: 

2110 return None 

2111 return BytesIO(contents) 

2112 

2113 def open_index(self) -> "Index": 

2114 """Fail to open index for this repo, since it is bare. 

2115 

2116 Raises: 

2117 NoIndexPresent: Raised when no index is present 

2118 """ 

2119 raise NoIndexPresent 

2120 

2121 def get_config(self): 

2122 """Retrieve the config object. 

2123 

2124 Returns: `ConfigFile` object. 

2125 """ 

2126 return self._config 

2127 

2128 def get_rebase_state_manager(self): 

2129 """Get the appropriate rebase state manager for this repository. 

2130 

2131 Returns: MemoryRebaseStateManager instance 

2132 """ 

2133 from .rebase import MemoryRebaseStateManager 

2134 

2135 return MemoryRebaseStateManager(self) 

2136 

2137 def get_blob_normalizer(self): 

2138 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2139 from .filters import FilterBlobNormalizer, FilterRegistry 

2140 

2141 # Get GitAttributes object 

2142 git_attributes = self.get_gitattributes() 

2143 config_stack = self.get_config_stack() 

2144 

2145 # Create FilterRegistry with repo reference 

2146 filter_registry = FilterRegistry(config_stack, self) 

2147 

2148 # Return FilterBlobNormalizer which handles all filters 

2149 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2150 

2151 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2152 """Read gitattributes for the repository.""" 

2153 from .attrs import GitAttributes 

2154 

2155 # Memory repos don't have working trees or gitattributes files 

2156 # Return empty GitAttributes 

2157 return GitAttributes([]) 

2158 

2159 def do_commit( 

2160 self, 

2161 message: Optional[bytes] = None, 

2162 committer: Optional[bytes] = None, 

2163 author: Optional[bytes] = None, 

2164 commit_timestamp=None, 

2165 commit_timezone=None, 

2166 author_timestamp=None, 

2167 author_timezone=None, 

2168 tree: Optional[ObjectID] = None, 

2169 encoding: Optional[bytes] = None, 

2170 ref: Optional[Ref] = b"HEAD", 

2171 merge_heads: Optional[list[ObjectID]] = None, 

2172 no_verify: bool = False, 

2173 sign: bool = False, 

2174 ): 

2175 """Create a new commit. 

2176 

2177 This is a simplified implementation for in-memory repositories that 

2178 doesn't support worktree operations or hooks. 

2179 

2180 Args: 

2181 message: Commit message 

2182 committer: Committer fullname 

2183 author: Author fullname 

2184 commit_timestamp: Commit timestamp (defaults to now) 

2185 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2186 author_timestamp: Author timestamp (defaults to commit timestamp) 

2187 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2188 tree: SHA1 of the tree root to use 

2189 encoding: Encoding 

2190 ref: Optional ref to commit to (defaults to current branch). 

2191 If None, creates a dangling commit without updating any ref. 

2192 merge_heads: Merge heads 

2193 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2194 sign: GPG Sign the commit (ignored for MemoryRepo) 

2195 

2196 Returns: 

2197 New commit SHA1 

2198 """ 

2199 import time 

2200 

2201 from .objects import Commit 

2202 

2203 if tree is None: 

2204 raise ValueError("tree must be specified for MemoryRepo") 

2205 

2206 c = Commit() 

2207 if len(tree) != 40: 

2208 raise ValueError("tree must be a 40-byte hex sha string") 

2209 c.tree = tree 

2210 

2211 config = self.get_config_stack() 

2212 if merge_heads is None: 

2213 merge_heads = [] 

2214 if committer is None: 

2215 committer = get_user_identity(config, kind="COMMITTER") 

2216 check_user_identity(committer) 

2217 c.committer = committer 

2218 if commit_timestamp is None: 

2219 commit_timestamp = time.time() 

2220 c.commit_time = int(commit_timestamp) 

2221 if commit_timezone is None: 

2222 commit_timezone = 0 

2223 c.commit_timezone = commit_timezone 

2224 if author is None: 

2225 author = get_user_identity(config, kind="AUTHOR") 

2226 c.author = author 

2227 check_user_identity(author) 

2228 if author_timestamp is None: 

2229 author_timestamp = commit_timestamp 

2230 c.author_time = int(author_timestamp) 

2231 if author_timezone is None: 

2232 author_timezone = commit_timezone 

2233 c.author_timezone = author_timezone 

2234 if encoding is None: 

2235 try: 

2236 encoding = config.get(("i18n",), "commitEncoding") 

2237 except KeyError: 

2238 pass 

2239 if encoding is not None: 

2240 c.encoding = encoding 

2241 

2242 # Handle message (for MemoryRepo, we don't support callable messages) 

2243 if callable(message): 

2244 message = message(self, c) 

2245 if message is None: 

2246 raise ValueError("Message callback returned None") 

2247 

2248 if message is None: 

2249 raise ValueError("No commit message specified") 

2250 

2251 c.message = message 

2252 

2253 if ref is None: 

2254 # Create a dangling commit 

2255 c.parents = merge_heads 

2256 self.object_store.add_object(c) 

2257 else: 

2258 try: 

2259 old_head = self.refs[ref] 

2260 c.parents = [old_head, *merge_heads] 

2261 self.object_store.add_object(c) 

2262 ok = self.refs.set_if_equals( 

2263 ref, 

2264 old_head, 

2265 c.id, 

2266 message=b"commit: " + message, 

2267 committer=committer, 

2268 timestamp=commit_timestamp, 

2269 timezone=commit_timezone, 

2270 ) 

2271 except KeyError: 

2272 c.parents = merge_heads 

2273 self.object_store.add_object(c) 

2274 ok = self.refs.add_if_new( 

2275 ref, 

2276 c.id, 

2277 message=b"commit: " + message, 

2278 committer=committer, 

2279 timestamp=commit_timestamp, 

2280 timezone=commit_timezone, 

2281 ) 

2282 if not ok: 

2283 from .errors import CommitError 

2284 

2285 raise CommitError(f"{ref!r} changed during commit") 

2286 

2287 return c.id 

2288 

2289 @classmethod 

2290 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2291 """Create a new bare repository in memory. 

2292 

2293 Args: 

2294 objects: Objects for the new repository, 

2295 as iterable 

2296 refs: Refs as dictionary, mapping names 

2297 to object SHA1s 

2298 format: Repository format version (defaults to 0) 

2299 """ 

2300 ret = cls() 

2301 for obj in objects: 

2302 ret.object_store.add_object(obj) 

2303 for refname, sha in refs.items(): 

2304 ret.refs.add_if_new(refname, sha) 

2305 ret._init_files(bare=True, format=format) 

2306 return ret