Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

955 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 Union, 

46) 

47 

48if TYPE_CHECKING: 

49 # There are no circular imports here, but we try to defer imports as long 

50 # as possible to reduce start-up time for anything that doesn't need 

51 # these imports. 

52 from .attrs import GitAttributes 

53 from .config import ConditionMatcher, ConfigFile, StackedConfig 

54 from .index import Index 

55 from .notes import Notes 

56 from .worktree import WorkTree 

57 

58from . import replace_me 

59from .errors import ( 

60 NoIndexPresent, 

61 NotBlobError, 

62 NotCommitError, 

63 NotGitRepository, 

64 NotTagError, 

65 NotTreeError, 

66 RefFormatError, 

67) 

68from .file import GitFile 

69from .hooks import ( 

70 CommitMsgShellHook, 

71 Hook, 

72 PostCommitShellHook, 

73 PostReceiveShellHook, 

74 PreCommitShellHook, 

75) 

76from .object_store import ( 

77 DiskObjectStore, 

78 MemoryObjectStore, 

79 MissingObjectFinder, 

80 ObjectStoreGraphWalker, 

81 PackBasedObjectStore, 

82 find_shallow, 

83 peel_sha, 

84) 

85from .objects import ( 

86 Blob, 

87 Commit, 

88 ObjectID, 

89 ShaFile, 

90 Tag, 

91 Tree, 

92 check_hexsha, 

93 valid_hexsha, 

94) 

95from .pack import generate_unpacked_objects 

96from .refs import ( 

97 ANNOTATED_TAG_SUFFIX, # noqa: F401 

98 LOCAL_BRANCH_PREFIX, 

99 LOCAL_TAG_PREFIX, # noqa: F401 

100 SYMREF, # noqa: F401 

101 DictRefsContainer, 

102 DiskRefsContainer, 

103 InfoRefsContainer, # noqa: F401 

104 Ref, 

105 RefsContainer, 

106 _set_default_branch, 

107 _set_head, 

108 _set_origin_head, 

109 check_ref_format, # noqa: F401 

110 read_packed_refs, # noqa: F401 

111 read_packed_refs_with_peeled, # noqa: F401 

112 serialize_refs, 

113 write_packed_refs, # noqa: F401 

114) 

115 

116CONTROLDIR = ".git" 

117OBJECTDIR = "objects" 

118REFSDIR = "refs" 

119REFSDIR_TAGS = "tags" 

120REFSDIR_HEADS = "heads" 

121INDEX_FILENAME = "index" 

122COMMONDIR = "commondir" 

123GITDIR = "gitdir" 

124WORKTREES = "worktrees" 

125 

126BASE_DIRECTORIES = [ 

127 ["branches"], 

128 [REFSDIR], 

129 [REFSDIR, REFSDIR_TAGS], 

130 [REFSDIR, REFSDIR_HEADS], 

131 ["hooks"], 

132 ["info"], 

133] 

134 

135DEFAULT_BRANCH = b"master" 

136 

137 

138class InvalidUserIdentity(Exception): 

139 """User identity is not of the format 'user <email>'.""" 

140 

141 def __init__(self, identity) -> None: 

142 self.identity = identity 

143 

144 

145class DefaultIdentityNotFound(Exception): 

146 """Default identity could not be determined.""" 

147 

148 

149# TODO(jelmer): Cache? 

150def _get_default_identity() -> tuple[str, str]: 

151 import socket 

152 

153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

154 username = os.environ.get(name) 

155 if username: 

156 break 

157 else: 

158 username = None 

159 

160 try: 

161 import pwd 

162 except ImportError: 

163 fullname = None 

164 else: 

165 try: 

166 entry = pwd.getpwuid(os.getuid()) # type: ignore 

167 except KeyError: 

168 fullname = None 

169 else: 

170 if getattr(entry, "gecos", None): 

171 fullname = entry.pw_gecos.split(",")[0] 

172 else: 

173 fullname = None 

174 if username is None: 

175 username = entry.pw_name 

176 if not fullname: 

177 if username is None: 

178 raise DefaultIdentityNotFound("no username found") 

179 fullname = username 

180 email = os.environ.get("EMAIL") 

181 if email is None: 

182 if username is None: 

183 raise DefaultIdentityNotFound("no username found") 

184 email = f"{username}@{socket.gethostname()}" 

185 return (fullname, email) 

186 

187 

188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

189 """Determine the identity to use for new commits. 

190 

191 If kind is set, this first checks 

192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

193 

194 If those variables are not set, then it will fall back 

195 to reading the user.name and user.email settings from 

196 the specified configuration. 

197 

198 If that also fails, then it will fall back to using 

199 the current users' identity as obtained from the host 

200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

201 

202 Args: 

203 kind: Optional kind to return identity for, 

204 usually either "AUTHOR" or "COMMITTER". 

205 

206 Returns: 

207 A user identity 

208 """ 

209 user: Optional[bytes] = None 

210 email: Optional[bytes] = None 

211 if kind: 

212 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

213 if user_uc is not None: 

214 user = user_uc.encode("utf-8") 

215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

216 if email_uc is not None: 

217 email = email_uc.encode("utf-8") 

218 if user is None: 

219 try: 

220 user = config.get(("user",), "name") 

221 except KeyError: 

222 user = None 

223 if email is None: 

224 try: 

225 email = config.get(("user",), "email") 

226 except KeyError: 

227 email = None 

228 default_user, default_email = _get_default_identity() 

229 if user is None: 

230 user = default_user.encode("utf-8") 

231 if email is None: 

232 email = default_email.encode("utf-8") 

233 if email.startswith(b"<") and email.endswith(b">"): 

234 email = email[1:-1] 

235 return user + b" <" + email + b">" 

236 

237 

238def check_user_identity(identity) -> None: 

239 """Verify that a user identity is formatted correctly. 

240 

241 Args: 

242 identity: User identity bytestring 

243 Raises: 

244 InvalidUserIdentity: Raised when identity is invalid 

245 """ 

246 try: 

247 fst, snd = identity.split(b" <", 1) 

248 except ValueError as exc: 

249 raise InvalidUserIdentity(identity) from exc 

250 if b">" not in snd: 

251 raise InvalidUserIdentity(identity) 

252 if b"\0" in identity or b"\n" in identity: 

253 raise InvalidUserIdentity(identity) 

254 

255 

256def parse_graftpoints( 

257 graftpoints: Iterable[bytes], 

258) -> dict[bytes, list[bytes]]: 

259 """Convert a list of graftpoints into a dict. 

260 

261 Args: 

262 graftpoints: Iterator of graftpoint lines 

263 

264 Each line is formatted as: 

265 <commit sha1> <parent sha1> [<parent sha1>]* 

266 

267 Resulting dictionary is: 

268 <commit sha1>: [<parent sha1>*] 

269 

270 https://git.wiki.kernel.org/index.php/GraftPoint 

271 """ 

272 grafts = {} 

273 for line in graftpoints: 

274 raw_graft = line.split(None, 1) 

275 

276 commit = raw_graft[0] 

277 if len(raw_graft) == 2: 

278 parents = raw_graft[1].split() 

279 else: 

280 parents = [] 

281 

282 for sha in [commit, *parents]: 

283 check_hexsha(sha, "Invalid graftpoint") 

284 

285 grafts[commit] = parents 

286 return grafts 

287 

288 

289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

290 """Convert a dictionary of grafts into string. 

291 

292 The graft dictionary is: 

293 <commit sha1>: [<parent sha1>*] 

294 

295 Each line is formatted as: 

296 <commit sha1> <parent sha1> [<parent sha1>]* 

297 

298 https://git.wiki.kernel.org/index.php/GraftPoint 

299 

300 """ 

301 graft_lines = [] 

302 for commit, parents in graftpoints.items(): 

303 if parents: 

304 graft_lines.append(commit + b" " + b" ".join(parents)) 

305 else: 

306 graft_lines.append(commit) 

307 return b"\n".join(graft_lines) 

308 

309 

310def _set_filesystem_hidden(path) -> None: 

311 """Mark path as to be hidden if supported by platform and filesystem. 

312 

313 On win32 uses SetFileAttributesW api: 

314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

315 """ 

316 if sys.platform == "win32": 

317 import ctypes 

318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

319 

320 FILE_ATTRIBUTE_HIDDEN = 2 

321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

322 ("SetFileAttributesW", ctypes.windll.kernel32) 

323 ) 

324 

325 if isinstance(path, bytes): 

326 path = os.fsdecode(path) 

327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

328 pass # Could raise or log `ctypes.WinError()` here 

329 

330 # Could implement other platform specific filesystem hiding here 

331 

332 

333class ParentsProvider: 

334 def __init__(self, store, grafts={}, shallows=[]) -> None: 

335 self.store = store 

336 self.grafts = grafts 

337 self.shallows = set(shallows) 

338 

339 # Get commit graph once at initialization for performance 

340 self.commit_graph = store.get_commit_graph() 

341 

342 def get_parents(self, commit_id, commit=None): 

343 try: 

344 return self.grafts[commit_id] 

345 except KeyError: 

346 pass 

347 if commit_id in self.shallows: 

348 return [] 

349 

350 # Try to use commit graph for faster parent lookup 

351 if self.commit_graph: 

352 parents = self.commit_graph.get_parents(commit_id) 

353 if parents is not None: 

354 return parents 

355 

356 # Fallback to reading the commit object 

357 if commit is None: 

358 commit = self.store[commit_id] 

359 return commit.parents 

360 

361 

362class BaseRepo: 

363 """Base class for a git repository. 

364 

365 This base class is meant to be used for Repository implementations that e.g. 

366 work on top of a different transport than a standard filesystem path. 

367 

368 Attributes: 

369 object_store: Dictionary-like object for accessing 

370 the objects 

371 refs: Dictionary-like object with the refs in this 

372 repository 

373 """ 

374 

375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

376 """Open a repository. 

377 

378 This shouldn't be called directly, but rather through one of the 

379 base classes, such as MemoryRepo or Repo. 

380 

381 Args: 

382 object_store: Object store to use 

383 refs: Refs container to use 

384 """ 

385 self.object_store = object_store 

386 self.refs = refs 

387 

388 self._graftpoints: dict[bytes, list[bytes]] = {} 

389 self.hooks: dict[str, Hook] = {} 

390 

391 def _determine_file_mode(self) -> bool: 

392 """Probe the file-system to determine whether permissions can be trusted. 

393 

394 Returns: True if permissions can be trusted, False otherwise. 

395 """ 

396 raise NotImplementedError(self._determine_file_mode) 

397 

398 def _determine_symlinks(self) -> bool: 

399 """Probe the filesystem to determine whether symlinks can be created. 

400 

401 Returns: True if symlinks can be created, False otherwise. 

402 """ 

403 # For now, just mimic the old behaviour 

404 return sys.platform != "win32" 

405 

406 def _init_files( 

407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

408 ) -> None: 

409 """Initialize a default set of named files.""" 

410 from .config import ConfigFile 

411 

412 self._put_named_file("description", b"Unnamed repository") 

413 f = BytesIO() 

414 cf = ConfigFile() 

415 if format is None: 

416 format = 0 

417 if format not in (0, 1): 

418 raise ValueError(f"Unsupported repository format version: {format}") 

419 cf.set("core", "repositoryformatversion", str(format)) 

420 if self._determine_file_mode(): 

421 cf.set("core", "filemode", True) 

422 else: 

423 cf.set("core", "filemode", False) 

424 

425 if symlinks is None and not bare: 

426 symlinks = self._determine_symlinks() 

427 

428 if symlinks is False: 

429 cf.set("core", "symlinks", symlinks) 

430 

431 cf.set("core", "bare", bare) 

432 cf.set("core", "logallrefupdates", True) 

433 cf.write_to_file(f) 

434 self._put_named_file("config", f.getvalue()) 

435 self._put_named_file(os.path.join("info", "exclude"), b"") 

436 

437 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

438 """Get a file from the control dir with a specific name. 

439 

440 Although the filename should be interpreted as a filename relative to 

441 the control dir in a disk-based Repo, the object returned need not be 

442 pointing to a file in that location. 

443 

444 Args: 

445 path: The path to the file, relative to the control dir. 

446 Returns: An open file object, or None if the file does not exist. 

447 """ 

448 raise NotImplementedError(self.get_named_file) 

449 

450 def _put_named_file(self, path: str, contents: bytes) -> None: 

451 """Write a file to the control dir with the given name and contents. 

452 

453 Args: 

454 path: The path to the file, relative to the control dir. 

455 contents: A string to write to the file. 

456 """ 

457 raise NotImplementedError(self._put_named_file) 

458 

459 def _del_named_file(self, path: str) -> None: 

460 """Delete a file in the control directory with the given name.""" 

461 raise NotImplementedError(self._del_named_file) 

462 

463 def open_index(self) -> "Index": 

464 """Open the index for this repository. 

465 

466 Raises: 

467 NoIndexPresent: If no index is present 

468 Returns: The matching `Index` 

469 """ 

470 raise NotImplementedError(self.open_index) 

471 

472 def fetch( 

473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None 

474 ): 

475 """Fetch objects into another repository. 

476 

477 Args: 

478 target: The target repository 

479 determine_wants: Optional function to determine what refs to 

480 fetch. 

481 progress: Optional progress function 

482 depth: Optional shallow fetch depth 

483 Returns: The local refs 

484 """ 

485 if determine_wants is None: 

486 determine_wants = target.object_store.determine_wants_all 

487 count, pack_data = self.fetch_pack_data( 

488 determine_wants, 

489 target.get_graph_walker(), 

490 progress=progress, 

491 depth=depth, 

492 ) 

493 target.object_store.add_pack_data(count, pack_data, progress) 

494 return self.get_refs() 

495 

496 def fetch_pack_data( 

497 self, 

498 determine_wants, 

499 graph_walker, 

500 progress, 

501 *, 

502 get_tagged=None, 

503 depth: Optional[int] = None, 

504 ): 

505 """Fetch the pack data required for a set of revisions. 

506 

507 Args: 

508 determine_wants: Function that takes a dictionary with heads 

509 and returns the list of heads to fetch. 

510 graph_walker: Object that can iterate over the list of revisions 

511 to fetch and has an "ack" method that will be called to acknowledge 

512 that a revision is present. 

513 progress: Simple progress function that will be called with 

514 updated progress strings. 

515 get_tagged: Function that returns a dict of pointed-to sha -> 

516 tag sha for including tags. 

517 depth: Shallow fetch depth 

518 Returns: count and iterator over pack data 

519 """ 

520 missing_objects = self.find_missing_objects( 

521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

522 ) 

523 if missing_objects is None: 

524 return 0, iter([]) 

525 remote_has = missing_objects.get_remote_has() 

526 object_ids = list(missing_objects) 

527 return len(object_ids), generate_unpacked_objects( 

528 self.object_store, object_ids, progress=progress, other_haves=remote_has 

529 ) 

530 

531 def find_missing_objects( 

532 self, 

533 determine_wants, 

534 graph_walker, 

535 progress, 

536 *, 

537 get_tagged=None, 

538 depth: Optional[int] = None, 

539 ) -> Optional[MissingObjectFinder]: 

540 """Fetch the missing objects required for a set of revisions. 

541 

542 Args: 

543 determine_wants: Function that takes a dictionary with heads 

544 and returns the list of heads to fetch. 

545 graph_walker: Object that can iterate over the list of revisions 

546 to fetch and has an "ack" method that will be called to acknowledge 

547 that a revision is present. 

548 progress: Simple progress function that will be called with 

549 updated progress strings. 

550 get_tagged: Function that returns a dict of pointed-to sha -> 

551 tag sha for including tags. 

552 depth: Shallow fetch depth 

553 Returns: iterator over objects, with __len__ implemented 

554 """ 

555 refs = serialize_refs(self.object_store, self.get_refs()) 

556 

557 wants = determine_wants(refs) 

558 if not isinstance(wants, list): 

559 raise TypeError("determine_wants() did not return a list") 

560 

561 current_shallow = set(getattr(graph_walker, "shallow", set())) 

562 

563 if depth not in (None, 0): 

564 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

565 # Only update if graph_walker has shallow attribute 

566 if hasattr(graph_walker, "shallow"): 

567 graph_walker.shallow.update(shallow - not_shallow) 

568 new_shallow = graph_walker.shallow - current_shallow 

569 unshallow = graph_walker.unshallow = not_shallow & current_shallow 

570 if hasattr(graph_walker, "update_shallow"): 

571 graph_walker.update_shallow(new_shallow, unshallow) 

572 else: 

573 unshallow = getattr(graph_walker, "unshallow", frozenset()) 

574 

575 if wants == []: 

576 # TODO(dborowitz): find a way to short-circuit that doesn't change 

577 # this interface. 

578 

579 if getattr(graph_walker, "shallow", set()) or unshallow: 

580 # Do not send a pack in shallow short-circuit path 

581 return None 

582 

583 class DummyMissingObjectFinder: 

584 def get_remote_has(self) -> None: 

585 return None 

586 

587 def __len__(self) -> int: 

588 return 0 

589 

590 def __iter__(self): 

591 yield from [] 

592 

593 return DummyMissingObjectFinder() # type: ignore 

594 

595 # If the graph walker is set up with an implementation that can 

596 # ACK/NAK to the wire, it will write data to the client through 

597 # this call as a side-effect. 

598 haves = self.object_store.find_common_revisions(graph_walker) 

599 

600 # Deal with shallow requests separately because the haves do 

601 # not reflect what objects are missing 

602 if getattr(graph_walker, "shallow", set()) or unshallow: 

603 # TODO: filter the haves commits from iter_shas. the specific 

604 # commits aren't missing. 

605 haves = [] 

606 

607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

608 

609 def get_parents(commit): 

610 return parents_provider.get_parents(commit.id, commit) 

611 

612 return MissingObjectFinder( 

613 self.object_store, 

614 haves=haves, 

615 wants=wants, 

616 shallow=getattr(graph_walker, "shallow", set()), 

617 progress=progress, 

618 get_tagged=get_tagged, 

619 get_parents=get_parents, 

620 ) 

621 

622 def generate_pack_data( 

623 self, 

624 have: list[ObjectID], 

625 want: list[ObjectID], 

626 progress: Optional[Callable[[str], None]] = None, 

627 ofs_delta: Optional[bool] = None, 

628 ): 

629 """Generate pack data objects for a set of wants/haves. 

630 

631 Args: 

632 have: List of SHA1s of objects that should not be sent 

633 want: List of SHA1s of objects that should be sent 

634 ofs_delta: Whether OFS deltas can be included 

635 progress: Optional progress reporting method 

636 """ 

637 return self.object_store.generate_pack_data( 

638 have, 

639 want, 

640 shallow=self.get_shallow(), 

641 progress=progress, 

642 ofs_delta=ofs_delta, 

643 ) 

644 

645 def get_graph_walker( 

646 self, heads: Optional[list[ObjectID]] = None 

647 ) -> ObjectStoreGraphWalker: 

648 """Retrieve a graph walker. 

649 

650 A graph walker is used by a remote repository (or proxy) 

651 to find out which objects are present in this repository. 

652 

653 Args: 

654 heads: Repository heads to use (optional) 

655 Returns: A graph walker object 

656 """ 

657 if heads is None: 

658 heads = [ 

659 sha 

660 for sha in self.refs.as_dict(b"refs/heads").values() 

661 if sha in self.object_store 

662 ] 

663 parents_provider = ParentsProvider(self.object_store) 

664 return ObjectStoreGraphWalker( 

665 heads, 

666 parents_provider.get_parents, 

667 shallow=self.get_shallow(), 

668 update_shallow=self.update_shallow, 

669 ) 

670 

671 def get_refs(self) -> dict[bytes, bytes]: 

672 """Get dictionary with all refs. 

673 

674 Returns: A ``dict`` mapping ref names to SHA1s 

675 """ 

676 return self.refs.as_dict() 

677 

678 def head(self) -> bytes: 

679 """Return the SHA1 pointed at by HEAD.""" 

680 return self.refs[b"HEAD"] 

681 

682 def _get_object(self, sha, cls): 

683 assert len(sha) in (20, 40) 

684 ret = self.get_object(sha) 

685 if not isinstance(ret, cls): 

686 if cls is Commit: 

687 raise NotCommitError(ret) 

688 elif cls is Blob: 

689 raise NotBlobError(ret) 

690 elif cls is Tree: 

691 raise NotTreeError(ret) 

692 elif cls is Tag: 

693 raise NotTagError(ret) 

694 else: 

695 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

696 return ret 

697 

698 def get_object(self, sha: bytes) -> ShaFile: 

699 """Retrieve the object with the specified SHA. 

700 

701 Args: 

702 sha: SHA to retrieve 

703 Returns: A ShaFile object 

704 Raises: 

705 KeyError: when the object can not be found 

706 """ 

707 return self.object_store[sha] 

708 

709 def parents_provider(self) -> ParentsProvider: 

710 return ParentsProvider( 

711 self.object_store, 

712 grafts=self._graftpoints, 

713 shallows=self.get_shallow(), 

714 ) 

715 

716 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

717 """Retrieve the parents of a specific commit. 

718 

719 If the specific commit is a graftpoint, the graft parents 

720 will be returned instead. 

721 

722 Args: 

723 sha: SHA of the commit for which to retrieve the parents 

724 commit: Optional commit matching the sha 

725 Returns: List of parents 

726 """ 

727 return self.parents_provider().get_parents(sha, commit) 

728 

729 def get_config(self) -> "ConfigFile": 

730 """Retrieve the config object. 

731 

732 Returns: `ConfigFile` object for the ``.git/config`` file. 

733 """ 

734 raise NotImplementedError(self.get_config) 

735 

736 def get_worktree_config(self) -> "ConfigFile": 

737 """Retrieve the worktree config object.""" 

738 raise NotImplementedError(self.get_worktree_config) 

739 

740 def get_description(self) -> Optional[str]: 

741 """Retrieve the description for this repository. 

742 

743 Returns: String with the description of the repository 

744 as set by the user. 

745 """ 

746 raise NotImplementedError(self.get_description) 

747 

748 def set_description(self, description) -> None: 

749 """Set the description for this repository. 

750 

751 Args: 

752 description: Text to set as description for this repository. 

753 """ 

754 raise NotImplementedError(self.set_description) 

755 

756 def get_rebase_state_manager(self): 

757 """Get the appropriate rebase state manager for this repository. 

758 

759 Returns: RebaseStateManager instance 

760 """ 

761 raise NotImplementedError(self.get_rebase_state_manager) 

762 

763 def get_blob_normalizer(self): 

764 """Return a BlobNormalizer object for checkin/checkout operations. 

765 

766 Returns: BlobNormalizer instance 

767 """ 

768 raise NotImplementedError(self.get_blob_normalizer) 

769 

770 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

771 """Read gitattributes for the repository. 

772 

773 Args: 

774 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

775 

776 Returns: 

777 GitAttributes object that can be used to match paths 

778 """ 

779 raise NotImplementedError(self.get_gitattributes) 

780 

781 def get_config_stack(self) -> "StackedConfig": 

782 """Return a config stack for this repository. 

783 

784 This stack accesses the configuration for both this repository 

785 itself (.git/config) and the global configuration, which usually 

786 lives in ~/.gitconfig. 

787 

788 Returns: `Config` instance for this repository 

789 """ 

790 from .config import ConfigFile, StackedConfig 

791 

792 local_config = self.get_config() 

793 backends: list[ConfigFile] = [local_config] 

794 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

795 backends.append(self.get_worktree_config()) 

796 

797 backends += StackedConfig.default_backends() 

798 return StackedConfig(backends, writable=local_config) 

799 

800 def get_shallow(self) -> set[ObjectID]: 

801 """Get the set of shallow commits. 

802 

803 Returns: Set of shallow commits. 

804 """ 

805 f = self.get_named_file("shallow") 

806 if f is None: 

807 return set() 

808 with f: 

809 return {line.strip() for line in f} 

810 

811 def update_shallow(self, new_shallow, new_unshallow) -> None: 

812 """Update the list of shallow objects. 

813 

814 Args: 

815 new_shallow: Newly shallow objects 

816 new_unshallow: Newly no longer shallow objects 

817 """ 

818 shallow = self.get_shallow() 

819 if new_shallow: 

820 shallow.update(new_shallow) 

821 if new_unshallow: 

822 shallow.difference_update(new_unshallow) 

823 if shallow: 

824 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

825 else: 

826 self._del_named_file("shallow") 

827 

828 def get_peeled(self, ref: Ref) -> ObjectID: 

829 """Get the peeled value of a ref. 

830 

831 Args: 

832 ref: The refname to peel. 

833 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

834 intermediate tags; if the original ref does not point to a tag, 

835 this will equal the original SHA1. 

836 """ 

837 cached = self.refs.get_peeled(ref) 

838 if cached is not None: 

839 return cached 

840 return peel_sha(self.object_store, self.refs[ref])[1].id 

841 

842 @property 

843 def notes(self) -> "Notes": 

844 """Access notes functionality for this repository. 

845 

846 Returns: 

847 Notes object for accessing notes 

848 """ 

849 from .notes import Notes 

850 

851 return Notes(self.object_store, self.refs) 

852 

853 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs): 

854 """Obtain a walker for this repository. 

855 

856 Args: 

857 include: Iterable of SHAs of commits to include along with their 

858 ancestors. Defaults to [HEAD] 

859 

860 Keyword Args: 

861 exclude: Iterable of SHAs of commits to exclude along with their 

862 ancestors, overriding includes. 

863 order: ORDER_* constant specifying the order of results. 

864 Anything other than ORDER_DATE may result in O(n) memory usage. 

865 reverse: If True, reverse the order of output, requiring O(n) 

866 memory. 

867 max_entries: The maximum number of entries to yield, or None for 

868 no limit. 

869 paths: Iterable of file or subtree paths to show entries for. 

870 rename_detector: diff.RenameDetector object for detecting 

871 renames. 

872 follow: If True, follow path across renames/copies. Forces a 

873 default rename_detector. 

874 since: Timestamp to list commits after. 

875 until: Timestamp to list commits before. 

876 queue_cls: A class to use for a queue of commits, supporting the 

877 iterator protocol. The constructor takes a single argument, the 

878 Walker. 

879 

880 Returns: A `Walker` object 

881 """ 

882 from .walk import Walker 

883 

884 if include is None: 

885 include = [self.head()] 

886 

887 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

888 

889 return Walker(self.object_store, include, **kwargs) 

890 

891 def __getitem__(self, name: Union[ObjectID, Ref]): 

892 """Retrieve a Git object by SHA1 or ref. 

893 

894 Args: 

895 name: A Git object SHA1 or a ref name 

896 Returns: A `ShaFile` object, such as a Commit or Blob 

897 Raises: 

898 KeyError: when the specified ref or object does not exist 

899 """ 

900 if not isinstance(name, bytes): 

901 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

902 if len(name) in (20, 40): 

903 try: 

904 return self.object_store[name] 

905 except (KeyError, ValueError): 

906 pass 

907 try: 

908 return self.object_store[self.refs[name]] 

909 except RefFormatError as exc: 

910 raise KeyError(name) from exc 

911 

912 def __contains__(self, name: bytes) -> bool: 

913 """Check if a specific Git object or ref is present. 

914 

915 Args: 

916 name: Git object SHA1 or ref name 

917 """ 

918 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

919 return name in self.object_store or name in self.refs 

920 else: 

921 return name in self.refs 

922 

923 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

924 """Set a ref. 

925 

926 Args: 

927 name: ref name 

928 value: Ref value - either a ShaFile object, or a hex sha 

929 """ 

930 if name.startswith(b"refs/") or name == b"HEAD": 

931 if isinstance(value, ShaFile): 

932 self.refs[name] = value.id 

933 elif isinstance(value, bytes): 

934 self.refs[name] = value 

935 else: 

936 raise TypeError(value) 

937 else: 

938 raise ValueError(name) 

939 

940 def __delitem__(self, name: bytes) -> None: 

941 """Remove a ref. 

942 

943 Args: 

944 name: Name of the ref to remove 

945 """ 

946 if name.startswith(b"refs/") or name == b"HEAD": 

947 del self.refs[name] 

948 else: 

949 raise ValueError(name) 

950 

951 def _get_user_identity( 

952 self, config: "StackedConfig", kind: Optional[str] = None 

953 ) -> bytes: 

954 """Determine the identity to use for new commits.""" 

955 warnings.warn( 

956 "use get_user_identity() rather than Repo._get_user_identity", 

957 DeprecationWarning, 

958 ) 

959 return get_user_identity(config) 

960 

961 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

962 """Add or modify graftpoints. 

963 

964 Args: 

965 updated_graftpoints: Dict of commit shas to list of parent shas 

966 """ 

967 # Simple validation 

968 for commit, parents in updated_graftpoints.items(): 

969 for sha in [commit, *parents]: 

970 check_hexsha(sha, "Invalid graftpoint") 

971 

972 self._graftpoints.update(updated_graftpoints) 

973 

974 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

975 """Remove graftpoints. 

976 

977 Args: 

978 to_remove: List of commit shas 

979 """ 

980 for sha in to_remove: 

981 del self._graftpoints[sha] 

982 

983 def _read_heads(self, name): 

984 f = self.get_named_file(name) 

985 if f is None: 

986 return [] 

987 with f: 

988 return [line.strip() for line in f.readlines() if line.strip()] 

989 

990 def get_worktree(self) -> "WorkTree": 

991 """Get the working tree for this repository. 

992 

993 Returns: 

994 WorkTree instance for performing working tree operations 

995 

996 Raises: 

997 NotImplementedError: If the repository doesn't support working trees 

998 """ 

999 raise NotImplementedError( 

1000 "Working tree operations not supported by this repository type" 

1001 ) 

1002 

1003 @replace_me() 

1004 def do_commit( 

1005 self, 

1006 message: Optional[bytes] = None, 

1007 committer: Optional[bytes] = None, 

1008 author: Optional[bytes] = None, 

1009 commit_timestamp=None, 

1010 commit_timezone=None, 

1011 author_timestamp=None, 

1012 author_timezone=None, 

1013 tree: Optional[ObjectID] = None, 

1014 encoding: Optional[bytes] = None, 

1015 ref: Optional[Ref] = b"HEAD", 

1016 merge_heads: Optional[list[ObjectID]] = None, 

1017 no_verify: bool = False, 

1018 sign: bool = False, 

1019 ): 

1020 """Create a new commit. 

1021 

1022 If not specified, committer and author default to 

1023 get_user_identity(..., 'COMMITTER') 

1024 and get_user_identity(..., 'AUTHOR') respectively. 

1025 

1026 Args: 

1027 message: Commit message (bytes or callable that takes (repo, commit) 

1028 and returns bytes) 

1029 committer: Committer fullname 

1030 author: Author fullname 

1031 commit_timestamp: Commit timestamp (defaults to now) 

1032 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1033 author_timestamp: Author timestamp (defaults to commit 

1034 timestamp) 

1035 author_timezone: Author timestamp timezone 

1036 (defaults to commit timestamp timezone) 

1037 tree: SHA1 of the tree root to use (if not specified the 

1038 current index will be committed). 

1039 encoding: Encoding 

1040 ref: Optional ref to commit to (defaults to current branch). 

1041 If None, creates a dangling commit without updating any ref. 

1042 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1043 no_verify: Skip pre-commit and commit-msg hooks 

1044 sign: GPG Sign the commit (bool, defaults to False, 

1045 pass True to use default GPG key, 

1046 pass a str containing Key ID to use a specific GPG key) 

1047 

1048 Returns: 

1049 New commit SHA1 

1050 """ 

1051 return self.get_worktree().commit( 

1052 message=message, 

1053 committer=committer, 

1054 author=author, 

1055 commit_timestamp=commit_timestamp, 

1056 commit_timezone=commit_timezone, 

1057 author_timestamp=author_timestamp, 

1058 author_timezone=author_timezone, 

1059 tree=tree, 

1060 encoding=encoding, 

1061 ref=ref, 

1062 merge_heads=merge_heads, 

1063 no_verify=no_verify, 

1064 sign=sign, 

1065 ) 

1066 

1067 

1068def read_gitfile(f): 

1069 """Read a ``.git`` file. 

1070 

1071 The first line of the file should start with "gitdir: " 

1072 

1073 Args: 

1074 f: File-like object to read from 

1075 Returns: A path 

1076 """ 

1077 cs = f.read() 

1078 if not cs.startswith("gitdir: "): 

1079 raise ValueError("Expected file to start with 'gitdir: '") 

1080 return cs[len("gitdir: ") :].rstrip("\n") 

1081 

1082 

1083class UnsupportedVersion(Exception): 

1084 """Unsupported repository version.""" 

1085 

1086 def __init__(self, version) -> None: 

1087 self.version = version 

1088 

1089 

1090class UnsupportedExtension(Exception): 

1091 """Unsupported repository extension.""" 

1092 

1093 def __init__(self, extension) -> None: 

1094 self.extension = extension 

1095 

1096 

1097class Repo(BaseRepo): 

1098 """A git repository backed by local disk. 

1099 

1100 To open an existing repository, call the constructor with 

1101 the path of the repository. 

1102 

1103 To create a new repository, use the Repo.init class method. 

1104 

1105 Note that a repository object may hold on to resources such 

1106 as file handles for performance reasons; call .close() to free 

1107 up those resources. 

1108 

1109 Attributes: 

1110 path: Path to the working copy (if it exists) or repository control 

1111 directory (if the repository is bare) 

1112 bare: Whether this is a bare repository 

1113 """ 

1114 

1115 path: str 

1116 bare: bool 

1117 

1118 def __init__( 

1119 self, 

1120 root: Union[str, bytes, os.PathLike], 

1121 object_store: Optional[PackBasedObjectStore] = None, 

1122 bare: Optional[bool] = None, 

1123 ) -> None: 

1124 """Open a repository on disk. 

1125 

1126 Args: 

1127 root: Path to the repository's root. 

1128 object_store: ObjectStore to use; if omitted, we use the 

1129 repository's default object store 

1130 bare: True if this is a bare repository. 

1131 """ 

1132 root = os.fspath(root) 

1133 if isinstance(root, bytes): 

1134 root = os.fsdecode(root) 

1135 hidden_path = os.path.join(root, CONTROLDIR) 

1136 if bare is None: 

1137 if os.path.isfile(hidden_path) or os.path.isdir( 

1138 os.path.join(hidden_path, OBJECTDIR) 

1139 ): 

1140 bare = False 

1141 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1142 os.path.join(root, REFSDIR) 

1143 ): 

1144 bare = True 

1145 else: 

1146 raise NotGitRepository( 

1147 "No git repository was found at {path}".format(**dict(path=root)) 

1148 ) 

1149 

1150 self.bare = bare 

1151 if bare is False: 

1152 if os.path.isfile(hidden_path): 

1153 with open(hidden_path) as f: 

1154 path = read_gitfile(f) 

1155 self._controldir = os.path.join(root, path) 

1156 else: 

1157 self._controldir = hidden_path 

1158 else: 

1159 self._controldir = root 

1160 commondir = self.get_named_file(COMMONDIR) 

1161 if commondir is not None: 

1162 with commondir: 

1163 self._commondir = os.path.join( 

1164 self.controldir(), 

1165 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1166 ) 

1167 else: 

1168 self._commondir = self._controldir 

1169 self.path = root 

1170 

1171 # Initialize refs early so they're available for config condition matchers 

1172 self.refs = DiskRefsContainer( 

1173 self.commondir(), self._controldir, logger=self._write_reflog 

1174 ) 

1175 

1176 config = self.get_config() 

1177 try: 

1178 repository_format_version = config.get("core", "repositoryformatversion") 

1179 format_version = ( 

1180 0 

1181 if repository_format_version is None 

1182 else int(repository_format_version) 

1183 ) 

1184 except KeyError: 

1185 format_version = 0 

1186 

1187 if format_version not in (0, 1): 

1188 raise UnsupportedVersion(format_version) 

1189 

1190 # Track extensions we encounter 

1191 has_reftable_extension = False 

1192 for extension, value in config.items((b"extensions",)): 

1193 if extension.lower() == b"refstorage": 

1194 if value == b"reftable": 

1195 has_reftable_extension = True 

1196 else: 

1197 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1198 elif extension.lower() not in (b"worktreeconfig",): 

1199 raise UnsupportedExtension(extension) 

1200 

1201 if object_store is None: 

1202 object_store = DiskObjectStore.from_config( 

1203 os.path.join(self.commondir(), OBJECTDIR), config 

1204 ) 

1205 

1206 # Use reftable if extension is configured 

1207 if has_reftable_extension: 

1208 from .reftable import ReftableRefsContainer 

1209 

1210 self.refs = ReftableRefsContainer(self.commondir()) 

1211 BaseRepo.__init__(self, object_store, self.refs) 

1212 

1213 self._graftpoints = {} 

1214 graft_file = self.get_named_file( 

1215 os.path.join("info", "grafts"), basedir=self.commondir() 

1216 ) 

1217 if graft_file: 

1218 with graft_file: 

1219 self._graftpoints.update(parse_graftpoints(graft_file)) 

1220 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1221 if graft_file: 

1222 with graft_file: 

1223 self._graftpoints.update(parse_graftpoints(graft_file)) 

1224 

1225 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1226 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1227 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1228 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1229 

1230 def get_worktree(self) -> "WorkTree": 

1231 """Get the working tree for this repository. 

1232 

1233 Returns: 

1234 WorkTree instance for performing working tree operations 

1235 """ 

1236 from .worktree import WorkTree 

1237 

1238 return WorkTree(self, self.path) 

1239 

1240 def _write_reflog( 

1241 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1242 ) -> None: 

1243 from .reflog import format_reflog_line 

1244 

1245 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1246 try: 

1247 os.makedirs(os.path.dirname(path)) 

1248 except FileExistsError: 

1249 pass 

1250 if committer is None: 

1251 config = self.get_config_stack() 

1252 committer = get_user_identity(config) 

1253 check_user_identity(committer) 

1254 if timestamp is None: 

1255 timestamp = int(time.time()) 

1256 if timezone is None: 

1257 timezone = 0 # FIXME 

1258 with open(path, "ab") as f: 

1259 f.write( 

1260 format_reflog_line( 

1261 old_sha, new_sha, committer, timestamp, timezone, message 

1262 ) 

1263 + b"\n" 

1264 ) 

1265 

1266 def read_reflog(self, ref): 

1267 """Read reflog entries for a reference. 

1268 

1269 Args: 

1270 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1271 

1272 Yields: 

1273 reflog.Entry objects in chronological order (oldest first) 

1274 """ 

1275 from .reflog import read_reflog 

1276 

1277 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1278 try: 

1279 with open(path, "rb") as f: 

1280 yield from read_reflog(f) 

1281 except FileNotFoundError: 

1282 return 

1283 

1284 @classmethod 

1285 def discover(cls, start="."): 

1286 """Iterate parent directories to discover a repository. 

1287 

1288 Return a Repo object for the first parent directory that looks like a 

1289 Git repository. 

1290 

1291 Args: 

1292 start: The directory to start discovery from (defaults to '.') 

1293 """ 

1294 remaining = True 

1295 path = os.path.abspath(start) 

1296 while remaining: 

1297 try: 

1298 return cls(path) 

1299 except NotGitRepository: 

1300 path, remaining = os.path.split(path) 

1301 raise NotGitRepository( 

1302 "No git repository was found at {path}".format(**dict(path=start)) 

1303 ) 

1304 

1305 def controldir(self): 

1306 """Return the path of the control directory.""" 

1307 return self._controldir 

1308 

1309 def commondir(self): 

1310 """Return the path of the common directory. 

1311 

1312 For a main working tree, it is identical to controldir(). 

1313 

1314 For a linked working tree, it is the control directory of the 

1315 main working tree. 

1316 """ 

1317 return self._commondir 

1318 

1319 def _determine_file_mode(self): 

1320 """Probe the file-system to determine whether permissions can be trusted. 

1321 

1322 Returns: True if permissions can be trusted, False otherwise. 

1323 """ 

1324 fname = os.path.join(self.path, ".probe-permissions") 

1325 with open(fname, "w") as f: 

1326 f.write("") 

1327 

1328 st1 = os.lstat(fname) 

1329 try: 

1330 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1331 except PermissionError: 

1332 return False 

1333 st2 = os.lstat(fname) 

1334 

1335 os.unlink(fname) 

1336 

1337 mode_differs = st1.st_mode != st2.st_mode 

1338 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1339 

1340 return mode_differs and st2_has_exec 

1341 

1342 def _determine_symlinks(self): 

1343 """Probe the filesystem to determine whether symlinks can be created. 

1344 

1345 Returns: True if symlinks can be created, False otherwise. 

1346 """ 

1347 # TODO(jelmer): Actually probe disk / look at filesystem 

1348 return sys.platform != "win32" 

1349 

1350 def _put_named_file(self, path, contents) -> None: 

1351 """Write a file to the control dir with the given name and contents. 

1352 

1353 Args: 

1354 path: The path to the file, relative to the control dir. 

1355 contents: A string to write to the file. 

1356 """ 

1357 path = path.lstrip(os.path.sep) 

1358 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1359 f.write(contents) 

1360 

1361 def _del_named_file(self, path) -> None: 

1362 try: 

1363 os.unlink(os.path.join(self.controldir(), path)) 

1364 except FileNotFoundError: 

1365 return 

1366 

1367 def get_named_file(self, path, basedir=None): 

1368 """Get a file from the control dir with a specific name. 

1369 

1370 Although the filename should be interpreted as a filename relative to 

1371 the control dir in a disk-based Repo, the object returned need not be 

1372 pointing to a file in that location. 

1373 

1374 Args: 

1375 path: The path to the file, relative to the control dir. 

1376 basedir: Optional argument that specifies an alternative to the 

1377 control dir. 

1378 Returns: An open file object, or None if the file does not exist. 

1379 """ 

1380 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1381 # the dumb web serving code. 

1382 if basedir is None: 

1383 basedir = self.controldir() 

1384 path = path.lstrip(os.path.sep) 

1385 try: 

1386 return open(os.path.join(basedir, path), "rb") 

1387 except FileNotFoundError: 

1388 return None 

1389 

1390 def index_path(self): 

1391 """Return path to the index file.""" 

1392 return os.path.join(self.controldir(), INDEX_FILENAME) 

1393 

1394 def open_index(self) -> "Index": 

1395 """Open the index for this repository. 

1396 

1397 Raises: 

1398 NoIndexPresent: If no index is present 

1399 Returns: The matching `Index` 

1400 """ 

1401 from .index import Index 

1402 

1403 if not self.has_index(): 

1404 raise NoIndexPresent 

1405 

1406 # Check for manyFiles feature configuration 

1407 config = self.get_config_stack() 

1408 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1409 skip_hash = False 

1410 index_version = None 

1411 

1412 if many_files: 

1413 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1414 try: 

1415 index_version_str = config.get(b"index", b"version") 

1416 index_version = int(index_version_str) 

1417 except KeyError: 

1418 index_version = 4 # Default to version 4 for manyFiles 

1419 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1420 else: 

1421 # Check for explicit index settings 

1422 try: 

1423 index_version_str = config.get(b"index", b"version") 

1424 index_version = int(index_version_str) 

1425 except KeyError: 

1426 index_version = None 

1427 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1428 

1429 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1430 

1431 def has_index(self) -> bool: 

1432 """Check if an index is present.""" 

1433 # Bare repos must never have index files; non-bare repos may have a 

1434 # missing index file, which is treated as empty. 

1435 return not self.bare 

1436 

1437 @replace_me() 

1438 def stage( 

1439 self, 

1440 fs_paths: Union[ 

1441 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1442 ], 

1443 ) -> None: 

1444 """Stage a set of paths. 

1445 

1446 Args: 

1447 fs_paths: List of paths, relative to the repository path 

1448 """ 

1449 return self.get_worktree().stage(fs_paths) 

1450 

1451 @replace_me() 

1452 def unstage(self, fs_paths: list[str]) -> None: 

1453 """Unstage specific file in the index 

1454 Args: 

1455 fs_paths: a list of files to unstage, 

1456 relative to the repository path. 

1457 """ 

1458 return self.get_worktree().unstage(fs_paths) 

1459 

1460 def clone( 

1461 self, 

1462 target_path, 

1463 *, 

1464 mkdir=True, 

1465 bare=False, 

1466 origin=b"origin", 

1467 checkout=None, 

1468 branch=None, 

1469 progress=None, 

1470 depth: Optional[int] = None, 

1471 symlinks=None, 

1472 ) -> "Repo": 

1473 """Clone this repository. 

1474 

1475 Args: 

1476 target_path: Target path 

1477 mkdir: Create the target directory 

1478 bare: Whether to create a bare repository 

1479 checkout: Whether or not to check-out HEAD after cloning 

1480 origin: Base name for refs in target repository 

1481 cloned from this repository 

1482 branch: Optional branch or tag to be used as HEAD in the new repository 

1483 instead of this repository's HEAD. 

1484 progress: Optional progress function 

1485 depth: Depth at which to fetch 

1486 symlinks: Symlinks setting (default to autodetect) 

1487 Returns: Created repository as `Repo` 

1488 """ 

1489 encoded_path = os.fsencode(self.path) 

1490 

1491 if mkdir: 

1492 os.mkdir(target_path) 

1493 

1494 try: 

1495 if not bare: 

1496 target = Repo.init(target_path, symlinks=symlinks) 

1497 if checkout is None: 

1498 checkout = True 

1499 else: 

1500 if checkout: 

1501 raise ValueError("checkout and bare are incompatible") 

1502 target = Repo.init_bare(target_path) 

1503 

1504 try: 

1505 target_config = target.get_config() 

1506 target_config.set((b"remote", origin), b"url", encoded_path) 

1507 target_config.set( 

1508 (b"remote", origin), 

1509 b"fetch", 

1510 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1511 ) 

1512 target_config.write_to_path() 

1513 

1514 ref_message = b"clone: from " + encoded_path 

1515 self.fetch(target, depth=depth) 

1516 target.refs.import_refs( 

1517 b"refs/remotes/" + origin, 

1518 self.refs.as_dict(b"refs/heads"), 

1519 message=ref_message, 

1520 ) 

1521 target.refs.import_refs( 

1522 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1523 ) 

1524 

1525 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1526 origin_head = head_chain[-1] if head_chain else None 

1527 if origin_sha and not origin_head: 

1528 # set detached HEAD 

1529 target.refs[b"HEAD"] = origin_sha 

1530 else: 

1531 _set_origin_head(target.refs, origin, origin_head) 

1532 head_ref = _set_default_branch( 

1533 target.refs, origin, origin_head, branch, ref_message 

1534 ) 

1535 

1536 # Update target head 

1537 if head_ref: 

1538 head = _set_head(target.refs, head_ref, ref_message) 

1539 else: 

1540 head = None 

1541 

1542 if checkout and head is not None: 

1543 target.reset_index() 

1544 except BaseException: 

1545 target.close() 

1546 raise 

1547 except BaseException: 

1548 if mkdir: 

1549 import shutil 

1550 

1551 shutil.rmtree(target_path) 

1552 raise 

1553 return target 

1554 

1555 @replace_me() 

1556 def reset_index(self, tree: Optional[bytes] = None): 

1557 """Reset the index back to a specific tree. 

1558 

1559 Args: 

1560 tree: Tree SHA to reset to, None for current HEAD tree. 

1561 """ 

1562 return self.get_worktree().reset_index(tree) 

1563 

1564 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1565 """Get condition matchers for includeIf conditions. 

1566 

1567 Returns a dict of condition prefix to matcher function. 

1568 """ 

1569 from pathlib import Path 

1570 

1571 from .config import ConditionMatcher, match_glob_pattern 

1572 

1573 # Add gitdir matchers 

1574 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1575 # Handle relative patterns (starting with ./) 

1576 if pattern.startswith("./"): 

1577 # Can't handle relative patterns without config directory context 

1578 return False 

1579 

1580 # Normalize repository path 

1581 try: 

1582 repo_path = str(Path(self._controldir).resolve()) 

1583 except (OSError, ValueError): 

1584 return False 

1585 

1586 # Expand ~ in pattern and normalize 

1587 pattern = os.path.expanduser(pattern) 

1588 

1589 # Normalize pattern following Git's rules 

1590 pattern = pattern.replace("\\", "/") 

1591 if not pattern.startswith(("~/", "./", "/", "**")): 

1592 # Check for Windows absolute path 

1593 if len(pattern) >= 2 and pattern[1] == ":": 

1594 pass 

1595 else: 

1596 pattern = "**/" + pattern 

1597 if pattern.endswith("/"): 

1598 pattern = pattern + "**" 

1599 

1600 # Use the existing _match_gitdir_pattern function 

1601 from .config import _match_gitdir_pattern 

1602 

1603 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1604 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1605 

1606 return _match_gitdir_pattern( 

1607 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1608 ) 

1609 

1610 # Add onbranch matcher 

1611 def match_onbranch(pattern: str) -> bool: 

1612 try: 

1613 # Get the current branch using refs 

1614 ref_chain, _ = self.refs.follow(b"HEAD") 

1615 head_ref = ref_chain[-1] # Get the final resolved ref 

1616 except KeyError: 

1617 pass 

1618 else: 

1619 if head_ref and head_ref.startswith(b"refs/heads/"): 

1620 # Extract branch name from ref 

1621 branch = head_ref[11:].decode("utf-8", errors="replace") 

1622 return match_glob_pattern(branch, pattern) 

1623 return False 

1624 

1625 matchers: dict[str, ConditionMatcher] = { 

1626 "onbranch:": match_onbranch, 

1627 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1628 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1629 } 

1630 

1631 return matchers 

1632 

1633 def get_worktree_config(self) -> "ConfigFile": 

1634 from .config import ConfigFile 

1635 

1636 path = os.path.join(self.commondir(), "config.worktree") 

1637 try: 

1638 # Pass condition matchers for includeIf evaluation 

1639 condition_matchers = self._get_config_condition_matchers() 

1640 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1641 except FileNotFoundError: 

1642 cf = ConfigFile() 

1643 cf.path = path 

1644 return cf 

1645 

1646 def get_config(self) -> "ConfigFile": 

1647 """Retrieve the config object. 

1648 

1649 Returns: `ConfigFile` object for the ``.git/config`` file. 

1650 """ 

1651 from .config import ConfigFile 

1652 

1653 path = os.path.join(self._commondir, "config") 

1654 try: 

1655 # Pass condition matchers for includeIf evaluation 

1656 condition_matchers = self._get_config_condition_matchers() 

1657 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1658 except FileNotFoundError: 

1659 ret = ConfigFile() 

1660 ret.path = path 

1661 return ret 

1662 

1663 def get_rebase_state_manager(self): 

1664 """Get the appropriate rebase state manager for this repository. 

1665 

1666 Returns: DiskRebaseStateManager instance 

1667 """ 

1668 import os 

1669 

1670 from .rebase import DiskRebaseStateManager 

1671 

1672 path = os.path.join(self.controldir(), "rebase-merge") 

1673 return DiskRebaseStateManager(path) 

1674 

1675 def get_description(self): 

1676 """Retrieve the description of this repository. 

1677 

1678 Returns: A string describing the repository or None. 

1679 """ 

1680 path = os.path.join(self._controldir, "description") 

1681 try: 

1682 with GitFile(path, "rb") as f: 

1683 return f.read() 

1684 except FileNotFoundError: 

1685 return None 

1686 

1687 def __repr__(self) -> str: 

1688 return f"<Repo at {self.path!r}>" 

1689 

1690 def set_description(self, description) -> None: 

1691 """Set the description for this repository. 

1692 

1693 Args: 

1694 description: Text to set as description for this repository. 

1695 """ 

1696 self._put_named_file("description", description) 

1697 

1698 @classmethod 

1699 def _init_maybe_bare( 

1700 cls, 

1701 path: Union[str, bytes, os.PathLike], 

1702 controldir: Union[str, bytes, os.PathLike], 

1703 bare, 

1704 object_store=None, 

1705 config=None, 

1706 default_branch=None, 

1707 symlinks: Optional[bool] = None, 

1708 format: Optional[int] = None, 

1709 ): 

1710 path = os.fspath(path) 

1711 if isinstance(path, bytes): 

1712 path = os.fsdecode(path) 

1713 controldir = os.fspath(controldir) 

1714 if isinstance(controldir, bytes): 

1715 controldir = os.fsdecode(controldir) 

1716 for d in BASE_DIRECTORIES: 

1717 os.mkdir(os.path.join(controldir, *d)) 

1718 if object_store is None: 

1719 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1720 ret = cls(path, bare=bare, object_store=object_store) 

1721 if default_branch is None: 

1722 if config is None: 

1723 from .config import StackedConfig 

1724 

1725 config = StackedConfig.default() 

1726 try: 

1727 default_branch = config.get("init", "defaultBranch") 

1728 except KeyError: 

1729 default_branch = DEFAULT_BRANCH 

1730 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1731 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1732 return ret 

1733 

1734 @classmethod 

1735 def init( 

1736 cls, 

1737 path: Union[str, bytes, os.PathLike], 

1738 *, 

1739 mkdir: bool = False, 

1740 config=None, 

1741 default_branch=None, 

1742 symlinks: Optional[bool] = None, 

1743 format: Optional[int] = None, 

1744 ) -> "Repo": 

1745 """Create a new repository. 

1746 

1747 Args: 

1748 path: Path in which to create the repository 

1749 mkdir: Whether to create the directory 

1750 format: Repository format version (defaults to 0) 

1751 Returns: `Repo` instance 

1752 """ 

1753 path = os.fspath(path) 

1754 if isinstance(path, bytes): 

1755 path = os.fsdecode(path) 

1756 if mkdir: 

1757 os.mkdir(path) 

1758 controldir = os.path.join(path, CONTROLDIR) 

1759 os.mkdir(controldir) 

1760 _set_filesystem_hidden(controldir) 

1761 return cls._init_maybe_bare( 

1762 path, 

1763 controldir, 

1764 False, 

1765 config=config, 

1766 default_branch=default_branch, 

1767 symlinks=symlinks, 

1768 format=format, 

1769 ) 

1770 

1771 @classmethod 

1772 def _init_new_working_directory( 

1773 cls, 

1774 path: Union[str, bytes, os.PathLike], 

1775 main_repo, 

1776 identifier=None, 

1777 mkdir=False, 

1778 ): 

1779 """Create a new working directory linked to a repository. 

1780 

1781 Args: 

1782 path: Path in which to create the working tree. 

1783 main_repo: Main repository to reference 

1784 identifier: Worktree identifier 

1785 mkdir: Whether to create the directory 

1786 Returns: `Repo` instance 

1787 """ 

1788 path = os.fspath(path) 

1789 if isinstance(path, bytes): 

1790 path = os.fsdecode(path) 

1791 if mkdir: 

1792 os.mkdir(path) 

1793 if identifier is None: 

1794 identifier = os.path.basename(path) 

1795 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) 

1796 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1797 gitdirfile = os.path.join(path, CONTROLDIR) 

1798 with open(gitdirfile, "wb") as f: 

1799 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1800 try: 

1801 os.mkdir(main_worktreesdir) 

1802 except FileExistsError: 

1803 pass 

1804 try: 

1805 os.mkdir(worktree_controldir) 

1806 except FileExistsError: 

1807 pass 

1808 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1809 f.write(os.fsencode(gitdirfile) + b"\n") 

1810 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1811 f.write(b"../..\n") 

1812 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1813 f.write(main_repo.head() + b"\n") 

1814 r = cls(path) 

1815 r.reset_index() 

1816 return r 

1817 

1818 @classmethod 

1819 def init_bare( 

1820 cls, 

1821 path: Union[str, bytes, os.PathLike], 

1822 *, 

1823 mkdir=False, 

1824 object_store=None, 

1825 config=None, 

1826 default_branch=None, 

1827 format: Optional[int] = None, 

1828 ): 

1829 """Create a new bare repository. 

1830 

1831 ``path`` should already exist and be an empty directory. 

1832 

1833 Args: 

1834 path: Path to create bare repository in 

1835 format: Repository format version (defaults to 0) 

1836 Returns: a `Repo` instance 

1837 """ 

1838 path = os.fspath(path) 

1839 if isinstance(path, bytes): 

1840 path = os.fsdecode(path) 

1841 if mkdir: 

1842 os.mkdir(path) 

1843 return cls._init_maybe_bare( 

1844 path, 

1845 path, 

1846 True, 

1847 object_store=object_store, 

1848 config=config, 

1849 default_branch=default_branch, 

1850 format=format, 

1851 ) 

1852 

1853 create = init_bare 

1854 

1855 def close(self) -> None: 

1856 """Close any files opened by this repository.""" 

1857 self.object_store.close() 

1858 

1859 def __enter__(self): 

1860 return self 

1861 

1862 def __exit__(self, exc_type, exc_val, exc_tb): 

1863 self.close() 

1864 

1865 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

1866 """Read .gitattributes file from working tree. 

1867 

1868 Returns: 

1869 Dictionary mapping file patterns to attributes 

1870 """ 

1871 gitattributes = {} 

1872 gitattributes_path = os.path.join(self.path, ".gitattributes") 

1873 

1874 if os.path.exists(gitattributes_path): 

1875 with open(gitattributes_path, "rb") as f: 

1876 for line in f: 

1877 line = line.strip() 

1878 if not line or line.startswith(b"#"): 

1879 continue 

1880 

1881 parts = line.split() 

1882 if len(parts) < 2: 

1883 continue 

1884 

1885 pattern = parts[0] 

1886 attrs = {} 

1887 

1888 for attr in parts[1:]: 

1889 if attr.startswith(b"-"): 

1890 # Unset attribute 

1891 attrs[attr[1:]] = b"false" 

1892 elif b"=" in attr: 

1893 # Set to value 

1894 key, value = attr.split(b"=", 1) 

1895 attrs[key] = value 

1896 else: 

1897 # Set attribute 

1898 attrs[attr] = b"true" 

1899 

1900 gitattributes[pattern] = attrs 

1901 

1902 return gitattributes 

1903 

1904 def get_blob_normalizer(self): 

1905 """Return a BlobNormalizer object.""" 

1906 from .filters import FilterBlobNormalizer, FilterRegistry 

1907 

1908 # Get proper GitAttributes object 

1909 git_attributes = self.get_gitattributes() 

1910 config_stack = self.get_config_stack() 

1911 

1912 # Create FilterRegistry with repo reference 

1913 filter_registry = FilterRegistry(config_stack, self) 

1914 

1915 # Return FilterBlobNormalizer which handles all filters including line endings 

1916 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

1917 

1918 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

1919 """Read gitattributes for the repository. 

1920 

1921 Args: 

1922 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

1923 

1924 Returns: 

1925 GitAttributes object that can be used to match paths 

1926 """ 

1927 from .attrs import ( 

1928 GitAttributes, 

1929 Pattern, 

1930 parse_git_attributes, 

1931 ) 

1932 

1933 patterns = [] 

1934 

1935 # Read system gitattributes (TODO: implement this) 

1936 # Read global gitattributes (TODO: implement this) 

1937 

1938 # Read repository .gitattributes from index/tree 

1939 if tree is None: 

1940 try: 

1941 # Try to get from HEAD 

1942 head = self[b"HEAD"] 

1943 if isinstance(head, Tag): 

1944 _cls, obj = head.object 

1945 head = self.get_object(obj) 

1946 tree = head.tree 

1947 except KeyError: 

1948 # No HEAD, no attributes from tree 

1949 pass 

1950 

1951 if tree is not None: 

1952 try: 

1953 tree_obj = self[tree] 

1954 if b".gitattributes" in tree_obj: 

1955 _, attrs_sha = tree_obj[b".gitattributes"] 

1956 attrs_blob = self[attrs_sha] 

1957 if isinstance(attrs_blob, Blob): 

1958 attrs_data = BytesIO(attrs_blob.data) 

1959 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

1960 pattern = Pattern(pattern_bytes) 

1961 patterns.append((pattern, attrs)) 

1962 except (KeyError, NotTreeError): 

1963 pass 

1964 

1965 # Read .git/info/attributes 

1966 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

1967 if os.path.exists(info_attrs_path): 

1968 with open(info_attrs_path, "rb") as f: 

1969 for pattern_bytes, attrs in parse_git_attributes(f): 

1970 pattern = Pattern(pattern_bytes) 

1971 patterns.append((pattern, attrs)) 

1972 

1973 # Read .gitattributes from working directory (if it exists) 

1974 working_attrs_path = os.path.join(self.path, ".gitattributes") 

1975 if os.path.exists(working_attrs_path): 

1976 with open(working_attrs_path, "rb") as f: 

1977 for pattern_bytes, attrs in parse_git_attributes(f): 

1978 pattern = Pattern(pattern_bytes) 

1979 patterns.append((pattern, attrs)) 

1980 

1981 return GitAttributes(patterns) 

1982 

1983 @replace_me() 

1984 def _sparse_checkout_file_path(self) -> str: 

1985 """Return the path of the sparse-checkout file in this repo's control dir.""" 

1986 return self.get_worktree()._sparse_checkout_file_path() 

1987 

1988 @replace_me() 

1989 def configure_for_cone_mode(self) -> None: 

1990 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

1991 return self.get_worktree().configure_for_cone_mode() 

1992 

1993 @replace_me() 

1994 def infer_cone_mode(self) -> bool: 

1995 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

1996 return self.get_worktree().infer_cone_mode() 

1997 

1998 @replace_me() 

1999 def get_sparse_checkout_patterns(self) -> list[str]: 

2000 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2001 

2002 Returns: 

2003 A list of patterns. Returns an empty list if the file is missing. 

2004 """ 

2005 return self.get_worktree().get_sparse_checkout_patterns() 

2006 

2007 @replace_me() 

2008 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2009 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2010 

2011 Creates the info/ directory if it does not exist. 

2012 

2013 Args: 

2014 patterns: A list of gitignore-style patterns to store. 

2015 """ 

2016 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2017 

2018 @replace_me() 

2019 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2020 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2021 

2022 For each directory to include, add an inclusion line that "undoes" the prior 

2023 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2024 Never add the same line twice. 

2025 """ 

2026 return self.get_worktree().set_cone_mode_patterns(dirs) 

2027 

2028 

2029class MemoryRepo(BaseRepo): 

2030 """Repo that stores refs, objects, and named files in memory. 

2031 

2032 MemoryRepos are always bare: they have no working tree and no index, since 

2033 those have a stronger dependency on the filesystem. 

2034 """ 

2035 

2036 def __init__(self) -> None: 

2037 """Create a new repository in memory.""" 

2038 from .config import ConfigFile 

2039 

2040 self._reflog: list[Any] = [] 

2041 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2042 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore 

2043 self._named_files: dict[str, bytes] = {} 

2044 self.bare = True 

2045 self._config = ConfigFile() 

2046 self._description = None 

2047 

2048 def _append_reflog(self, *args) -> None: 

2049 self._reflog.append(args) 

2050 

2051 def set_description(self, description) -> None: 

2052 self._description = description 

2053 

2054 def get_description(self): 

2055 return self._description 

2056 

2057 def _determine_file_mode(self): 

2058 """Probe the file-system to determine whether permissions can be trusted. 

2059 

2060 Returns: True if permissions can be trusted, False otherwise. 

2061 """ 

2062 return sys.platform != "win32" 

2063 

2064 def _determine_symlinks(self): 

2065 """Probe the file-system to determine whether permissions can be trusted. 

2066 

2067 Returns: True if permissions can be trusted, False otherwise. 

2068 """ 

2069 return sys.platform != "win32" 

2070 

2071 def _put_named_file(self, path, contents) -> None: 

2072 """Write a file to the control dir with the given name and contents. 

2073 

2074 Args: 

2075 path: The path to the file, relative to the control dir. 

2076 contents: A string to write to the file. 

2077 """ 

2078 self._named_files[path] = contents 

2079 

2080 def _del_named_file(self, path) -> None: 

2081 try: 

2082 del self._named_files[path] 

2083 except KeyError: 

2084 pass 

2085 

2086 def get_named_file(self, path, basedir=None): 

2087 """Get a file from the control dir with a specific name. 

2088 

2089 Although the filename should be interpreted as a filename relative to 

2090 the control dir in a disk-baked Repo, the object returned need not be 

2091 pointing to a file in that location. 

2092 

2093 Args: 

2094 path: The path to the file, relative to the control dir. 

2095 Returns: An open file object, or None if the file does not exist. 

2096 """ 

2097 contents = self._named_files.get(path, None) 

2098 if contents is None: 

2099 return None 

2100 return BytesIO(contents) 

2101 

2102 def open_index(self) -> "Index": 

2103 """Fail to open index for this repo, since it is bare. 

2104 

2105 Raises: 

2106 NoIndexPresent: Raised when no index is present 

2107 """ 

2108 raise NoIndexPresent 

2109 

2110 def get_config(self): 

2111 """Retrieve the config object. 

2112 

2113 Returns: `ConfigFile` object. 

2114 """ 

2115 return self._config 

2116 

2117 def get_rebase_state_manager(self): 

2118 """Get the appropriate rebase state manager for this repository. 

2119 

2120 Returns: MemoryRebaseStateManager instance 

2121 """ 

2122 from .rebase import MemoryRebaseStateManager 

2123 

2124 return MemoryRebaseStateManager(self) 

2125 

2126 def get_blob_normalizer(self): 

2127 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2128 from .filters import FilterBlobNormalizer, FilterRegistry 

2129 

2130 # Get GitAttributes object 

2131 git_attributes = self.get_gitattributes() 

2132 config_stack = self.get_config_stack() 

2133 

2134 # Create FilterRegistry with repo reference 

2135 filter_registry = FilterRegistry(config_stack, self) 

2136 

2137 # Return FilterBlobNormalizer which handles all filters 

2138 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2139 

2140 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2141 """Read gitattributes for the repository.""" 

2142 from .attrs import GitAttributes 

2143 

2144 # Memory repos don't have working trees or gitattributes files 

2145 # Return empty GitAttributes 

2146 return GitAttributes([]) 

2147 

2148 def do_commit( 

2149 self, 

2150 message: Optional[bytes] = None, 

2151 committer: Optional[bytes] = None, 

2152 author: Optional[bytes] = None, 

2153 commit_timestamp=None, 

2154 commit_timezone=None, 

2155 author_timestamp=None, 

2156 author_timezone=None, 

2157 tree: Optional[ObjectID] = None, 

2158 encoding: Optional[bytes] = None, 

2159 ref: Optional[Ref] = b"HEAD", 

2160 merge_heads: Optional[list[ObjectID]] = None, 

2161 no_verify: bool = False, 

2162 sign: bool = False, 

2163 ): 

2164 """Create a new commit. 

2165 

2166 This is a simplified implementation for in-memory repositories that 

2167 doesn't support worktree operations or hooks. 

2168 

2169 Args: 

2170 message: Commit message 

2171 committer: Committer fullname 

2172 author: Author fullname 

2173 commit_timestamp: Commit timestamp (defaults to now) 

2174 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2175 author_timestamp: Author timestamp (defaults to commit timestamp) 

2176 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2177 tree: SHA1 of the tree root to use 

2178 encoding: Encoding 

2179 ref: Optional ref to commit to (defaults to current branch). 

2180 If None, creates a dangling commit without updating any ref. 

2181 merge_heads: Merge heads 

2182 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2183 sign: GPG Sign the commit (ignored for MemoryRepo) 

2184 

2185 Returns: 

2186 New commit SHA1 

2187 """ 

2188 import time 

2189 

2190 from .objects import Commit 

2191 

2192 if tree is None: 

2193 raise ValueError("tree must be specified for MemoryRepo") 

2194 

2195 c = Commit() 

2196 if len(tree) != 40: 

2197 raise ValueError("tree must be a 40-byte hex sha string") 

2198 c.tree = tree 

2199 

2200 config = self.get_config_stack() 

2201 if merge_heads is None: 

2202 merge_heads = [] 

2203 if committer is None: 

2204 committer = get_user_identity(config, kind="COMMITTER") 

2205 check_user_identity(committer) 

2206 c.committer = committer 

2207 if commit_timestamp is None: 

2208 commit_timestamp = time.time() 

2209 c.commit_time = int(commit_timestamp) 

2210 if commit_timezone is None: 

2211 commit_timezone = 0 

2212 c.commit_timezone = commit_timezone 

2213 if author is None: 

2214 author = get_user_identity(config, kind="AUTHOR") 

2215 c.author = author 

2216 check_user_identity(author) 

2217 if author_timestamp is None: 

2218 author_timestamp = commit_timestamp 

2219 c.author_time = int(author_timestamp) 

2220 if author_timezone is None: 

2221 author_timezone = commit_timezone 

2222 c.author_timezone = author_timezone 

2223 if encoding is None: 

2224 try: 

2225 encoding = config.get(("i18n",), "commitEncoding") 

2226 except KeyError: 

2227 pass 

2228 if encoding is not None: 

2229 c.encoding = encoding 

2230 

2231 # Handle message (for MemoryRepo, we don't support callable messages) 

2232 if callable(message): 

2233 message = message(self, c) 

2234 if message is None: 

2235 raise ValueError("Message callback returned None") 

2236 

2237 if message is None: 

2238 raise ValueError("No commit message specified") 

2239 

2240 c.message = message 

2241 

2242 if ref is None: 

2243 # Create a dangling commit 

2244 c.parents = merge_heads 

2245 self.object_store.add_object(c) 

2246 else: 

2247 try: 

2248 old_head = self.refs[ref] 

2249 c.parents = [old_head, *merge_heads] 

2250 self.object_store.add_object(c) 

2251 ok = self.refs.set_if_equals( 

2252 ref, 

2253 old_head, 

2254 c.id, 

2255 message=b"commit: " + message, 

2256 committer=committer, 

2257 timestamp=commit_timestamp, 

2258 timezone=commit_timezone, 

2259 ) 

2260 except KeyError: 

2261 c.parents = merge_heads 

2262 self.object_store.add_object(c) 

2263 ok = self.refs.add_if_new( 

2264 ref, 

2265 c.id, 

2266 message=b"commit: " + message, 

2267 committer=committer, 

2268 timestamp=commit_timestamp, 

2269 timezone=commit_timezone, 

2270 ) 

2271 if not ok: 

2272 from .errors import CommitError 

2273 

2274 raise CommitError(f"{ref!r} changed during commit") 

2275 

2276 return c.id 

2277 

2278 @classmethod 

2279 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2280 """Create a new bare repository in memory. 

2281 

2282 Args: 

2283 objects: Objects for the new repository, 

2284 as iterable 

2285 refs: Refs as dictionary, mapping names 

2286 to object SHA1s 

2287 format: Repository format version (defaults to 0) 

2288 """ 

2289 ret = cls() 

2290 for obj in objects: 

2291 ret.object_store.add_object(obj) 

2292 for refname, sha in refs.items(): 

2293 ret.refs.add_if_new(refname, sha) 

2294 ret._init_files(bare=True, format=format) 

2295 return ret