Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

986 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as public by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 Union, 

46) 

47 

48if TYPE_CHECKING: 

49 # There are no circular imports here, but we try to defer imports as long 

50 # as possible to reduce start-up time for anything that doesn't need 

51 # these imports. 

52 from .config import ConditionMatcher, ConfigFile, StackedConfig 

53 from .index import Index 

54 from .notes import Notes 

55 

56from .errors import ( 

57 CommitError, 

58 HookError, 

59 NoIndexPresent, 

60 NotBlobError, 

61 NotCommitError, 

62 NotGitRepository, 

63 NotTagError, 

64 NotTreeError, 

65 RefFormatError, 

66) 

67from .file import GitFile 

68from .hooks import ( 

69 CommitMsgShellHook, 

70 Hook, 

71 PostCommitShellHook, 

72 PostReceiveShellHook, 

73 PreCommitShellHook, 

74) 

75from .line_ending import BlobNormalizer, TreeBlobNormalizer 

76from .object_store import ( 

77 DiskObjectStore, 

78 MemoryObjectStore, 

79 MissingObjectFinder, 

80 ObjectStoreGraphWalker, 

81 PackBasedObjectStore, 

82 find_shallow, 

83 peel_sha, 

84) 

85from .objects import ( 

86 Blob, 

87 Commit, 

88 ObjectID, 

89 ShaFile, 

90 Tag, 

91 Tree, 

92 check_hexsha, 

93 valid_hexsha, 

94) 

95from .pack import generate_unpacked_objects 

96from .refs import ( 

97 ANNOTATED_TAG_SUFFIX, # noqa: F401 

98 LOCAL_BRANCH_PREFIX, 

99 LOCAL_TAG_PREFIX, # noqa: F401 

100 SYMREF, # noqa: F401 

101 DictRefsContainer, 

102 DiskRefsContainer, 

103 InfoRefsContainer, # noqa: F401 

104 Ref, 

105 RefsContainer, 

106 _set_default_branch, 

107 _set_head, 

108 _set_origin_head, 

109 check_ref_format, # noqa: F401 

110 read_packed_refs, # noqa: F401 

111 read_packed_refs_with_peeled, # noqa: F401 

112 serialize_refs, 

113 write_packed_refs, # noqa: F401 

114) 

115 

116CONTROLDIR = ".git" 

117OBJECTDIR = "objects" 

118REFSDIR = "refs" 

119REFSDIR_TAGS = "tags" 

120REFSDIR_HEADS = "heads" 

121INDEX_FILENAME = "index" 

122COMMONDIR = "commondir" 

123GITDIR = "gitdir" 

124WORKTREES = "worktrees" 

125 

126BASE_DIRECTORIES = [ 

127 ["branches"], 

128 [REFSDIR], 

129 [REFSDIR, REFSDIR_TAGS], 

130 [REFSDIR, REFSDIR_HEADS], 

131 ["hooks"], 

132 ["info"], 

133] 

134 

135DEFAULT_BRANCH = b"master" 

136 

137 

138class InvalidUserIdentity(Exception): 

139 """User identity is not of the format 'user <email>'.""" 

140 

141 def __init__(self, identity) -> None: 

142 self.identity = identity 

143 

144 

145class DefaultIdentityNotFound(Exception): 

146 """Default identity could not be determined.""" 

147 

148 

149# TODO(jelmer): Cache? 

150def _get_default_identity() -> tuple[str, str]: 

151 import socket 

152 

153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

154 username = os.environ.get(name) 

155 if username: 

156 break 

157 else: 

158 username = None 

159 

160 try: 

161 import pwd 

162 except ImportError: 

163 fullname = None 

164 else: 

165 try: 

166 entry = pwd.getpwuid(os.getuid()) # type: ignore 

167 except KeyError: 

168 fullname = None 

169 else: 

170 if getattr(entry, "gecos", None): 

171 fullname = entry.pw_gecos.split(",")[0] 

172 else: 

173 fullname = None 

174 if username is None: 

175 username = entry.pw_name 

176 if not fullname: 

177 if username is None: 

178 raise DefaultIdentityNotFound("no username found") 

179 fullname = username 

180 email = os.environ.get("EMAIL") 

181 if email is None: 

182 if username is None: 

183 raise DefaultIdentityNotFound("no username found") 

184 email = f"{username}@{socket.gethostname()}" 

185 return (fullname, email) 

186 

187 

188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

189 """Determine the identity to use for new commits. 

190 

191 If kind is set, this first checks 

192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

193 

194 If those variables are not set, then it will fall back 

195 to reading the user.name and user.email settings from 

196 the specified configuration. 

197 

198 If that also fails, then it will fall back to using 

199 the current users' identity as obtained from the host 

200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

201 

202 Args: 

203 kind: Optional kind to return identity for, 

204 usually either "AUTHOR" or "COMMITTER". 

205 

206 Returns: 

207 A user identity 

208 """ 

209 user: Optional[bytes] = None 

210 email: Optional[bytes] = None 

211 if kind: 

212 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

213 if user_uc is not None: 

214 user = user_uc.encode("utf-8") 

215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

216 if email_uc is not None: 

217 email = email_uc.encode("utf-8") 

218 if user is None: 

219 try: 

220 user = config.get(("user",), "name") 

221 except KeyError: 

222 user = None 

223 if email is None: 

224 try: 

225 email = config.get(("user",), "email") 

226 except KeyError: 

227 email = None 

228 default_user, default_email = _get_default_identity() 

229 if user is None: 

230 user = default_user.encode("utf-8") 

231 if email is None: 

232 email = default_email.encode("utf-8") 

233 if email.startswith(b"<") and email.endswith(b">"): 

234 email = email[1:-1] 

235 return user + b" <" + email + b">" 

236 

237 

238def check_user_identity(identity) -> None: 

239 """Verify that a user identity is formatted correctly. 

240 

241 Args: 

242 identity: User identity bytestring 

243 Raises: 

244 InvalidUserIdentity: Raised when identity is invalid 

245 """ 

246 try: 

247 fst, snd = identity.split(b" <", 1) 

248 except ValueError as exc: 

249 raise InvalidUserIdentity(identity) from exc 

250 if b">" not in snd: 

251 raise InvalidUserIdentity(identity) 

252 if b"\0" in identity or b"\n" in identity: 

253 raise InvalidUserIdentity(identity) 

254 

255 

256def parse_graftpoints( 

257 graftpoints: Iterable[bytes], 

258) -> dict[bytes, list[bytes]]: 

259 """Convert a list of graftpoints into a dict. 

260 

261 Args: 

262 graftpoints: Iterator of graftpoint lines 

263 

264 Each line is formatted as: 

265 <commit sha1> <parent sha1> [<parent sha1>]* 

266 

267 Resulting dictionary is: 

268 <commit sha1>: [<parent sha1>*] 

269 

270 https://git.wiki.kernel.org/index.php/GraftPoint 

271 """ 

272 grafts = {} 

273 for line in graftpoints: 

274 raw_graft = line.split(None, 1) 

275 

276 commit = raw_graft[0] 

277 if len(raw_graft) == 2: 

278 parents = raw_graft[1].split() 

279 else: 

280 parents = [] 

281 

282 for sha in [commit, *parents]: 

283 check_hexsha(sha, "Invalid graftpoint") 

284 

285 grafts[commit] = parents 

286 return grafts 

287 

288 

289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

290 """Convert a dictionary of grafts into string. 

291 

292 The graft dictionary is: 

293 <commit sha1>: [<parent sha1>*] 

294 

295 Each line is formatted as: 

296 <commit sha1> <parent sha1> [<parent sha1>]* 

297 

298 https://git.wiki.kernel.org/index.php/GraftPoint 

299 

300 """ 

301 graft_lines = [] 

302 for commit, parents in graftpoints.items(): 

303 if parents: 

304 graft_lines.append(commit + b" " + b" ".join(parents)) 

305 else: 

306 graft_lines.append(commit) 

307 return b"\n".join(graft_lines) 

308 

309 

310def _set_filesystem_hidden(path) -> None: 

311 """Mark path as to be hidden if supported by platform and filesystem. 

312 

313 On win32 uses SetFileAttributesW api: 

314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

315 """ 

316 if sys.platform == "win32": 

317 import ctypes 

318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

319 

320 FILE_ATTRIBUTE_HIDDEN = 2 

321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

322 ("SetFileAttributesW", ctypes.windll.kernel32) 

323 ) 

324 

325 if isinstance(path, bytes): 

326 path = os.fsdecode(path) 

327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

328 pass # Could raise or log `ctypes.WinError()` here 

329 

330 # Could implement other platform specific filesystem hiding here 

331 

332 

333class ParentsProvider: 

334 def __init__(self, store, grafts={}, shallows=[]) -> None: 

335 self.store = store 

336 self.grafts = grafts 

337 self.shallows = set(shallows) 

338 

339 # Get commit graph once at initialization for performance 

340 self.commit_graph = store.get_commit_graph() 

341 

342 def get_parents(self, commit_id, commit=None): 

343 try: 

344 return self.grafts[commit_id] 

345 except KeyError: 

346 pass 

347 if commit_id in self.shallows: 

348 return [] 

349 

350 # Try to use commit graph for faster parent lookup 

351 if self.commit_graph: 

352 parents = self.commit_graph.get_parents(commit_id) 

353 if parents is not None: 

354 return parents 

355 

356 # Fallback to reading the commit object 

357 if commit is None: 

358 commit = self.store[commit_id] 

359 return commit.parents 

360 

361 

362class BaseRepo: 

363 """Base class for a git repository. 

364 

365 This base class is meant to be used for Repository implementations that e.g. 

366 work on top of a different transport than a standard filesystem path. 

367 

368 Attributes: 

369 object_store: Dictionary-like object for accessing 

370 the objects 

371 refs: Dictionary-like object with the refs in this 

372 repository 

373 """ 

374 

375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

376 """Open a repository. 

377 

378 This shouldn't be called directly, but rather through one of the 

379 base classes, such as MemoryRepo or Repo. 

380 

381 Args: 

382 object_store: Object store to use 

383 refs: Refs container to use 

384 """ 

385 self.object_store = object_store 

386 self.refs = refs 

387 

388 self._graftpoints: dict[bytes, list[bytes]] = {} 

389 self.hooks: dict[str, Hook] = {} 

390 

391 def _determine_file_mode(self) -> bool: 

392 """Probe the file-system to determine whether permissions can be trusted. 

393 

394 Returns: True if permissions can be trusted, False otherwise. 

395 """ 

396 raise NotImplementedError(self._determine_file_mode) 

397 

398 def _determine_symlinks(self) -> bool: 

399 """Probe the filesystem to determine whether symlinks can be created. 

400 

401 Returns: True if symlinks can be created, False otherwise. 

402 """ 

403 # For now, just mimic the old behaviour 

404 return sys.platform != "win32" 

405 

406 def _init_files( 

407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

408 ) -> None: 

409 """Initialize a default set of named files.""" 

410 from .config import ConfigFile 

411 

412 self._put_named_file("description", b"Unnamed repository") 

413 f = BytesIO() 

414 cf = ConfigFile() 

415 if format is None: 

416 format = 0 

417 if format not in (0, 1): 

418 raise ValueError(f"Unsupported repository format version: {format}") 

419 cf.set("core", "repositoryformatversion", str(format)) 

420 if self._determine_file_mode(): 

421 cf.set("core", "filemode", True) 

422 else: 

423 cf.set("core", "filemode", False) 

424 

425 if symlinks is None and not bare: 

426 symlinks = self._determine_symlinks() 

427 

428 if symlinks is False: 

429 cf.set("core", "symlinks", symlinks) 

430 

431 cf.set("core", "bare", bare) 

432 cf.set("core", "logallrefupdates", True) 

433 cf.write_to_file(f) 

434 self._put_named_file("config", f.getvalue()) 

435 self._put_named_file(os.path.join("info", "exclude"), b"") 

436 

437 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

438 """Get a file from the control dir with a specific name. 

439 

440 Although the filename should be interpreted as a filename relative to 

441 the control dir in a disk-based Repo, the object returned need not be 

442 pointing to a file in that location. 

443 

444 Args: 

445 path: The path to the file, relative to the control dir. 

446 Returns: An open file object, or None if the file does not exist. 

447 """ 

448 raise NotImplementedError(self.get_named_file) 

449 

450 def _put_named_file(self, path: str, contents: bytes) -> None: 

451 """Write a file to the control dir with the given name and contents. 

452 

453 Args: 

454 path: The path to the file, relative to the control dir. 

455 contents: A string to write to the file. 

456 """ 

457 raise NotImplementedError(self._put_named_file) 

458 

459 def _del_named_file(self, path: str) -> None: 

460 """Delete a file in the control directory with the given name.""" 

461 raise NotImplementedError(self._del_named_file) 

462 

463 def open_index(self) -> "Index": 

464 """Open the index for this repository. 

465 

466 Raises: 

467 NoIndexPresent: If no index is present 

468 Returns: The matching `Index` 

469 """ 

470 raise NotImplementedError(self.open_index) 

471 

472 def fetch( 

473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None 

474 ): 

475 """Fetch objects into another repository. 

476 

477 Args: 

478 target: The target repository 

479 determine_wants: Optional function to determine what refs to 

480 fetch. 

481 progress: Optional progress function 

482 depth: Optional shallow fetch depth 

483 Returns: The local refs 

484 """ 

485 if determine_wants is None: 

486 determine_wants = target.object_store.determine_wants_all 

487 count, pack_data = self.fetch_pack_data( 

488 determine_wants, 

489 target.get_graph_walker(), 

490 progress=progress, 

491 depth=depth, 

492 ) 

493 target.object_store.add_pack_data(count, pack_data, progress) 

494 return self.get_refs() 

495 

496 def fetch_pack_data( 

497 self, 

498 determine_wants, 

499 graph_walker, 

500 progress, 

501 *, 

502 get_tagged=None, 

503 depth: Optional[int] = None, 

504 ): 

505 """Fetch the pack data required for a set of revisions. 

506 

507 Args: 

508 determine_wants: Function that takes a dictionary with heads 

509 and returns the list of heads to fetch. 

510 graph_walker: Object that can iterate over the list of revisions 

511 to fetch and has an "ack" method that will be called to acknowledge 

512 that a revision is present. 

513 progress: Simple progress function that will be called with 

514 updated progress strings. 

515 get_tagged: Function that returns a dict of pointed-to sha -> 

516 tag sha for including tags. 

517 depth: Shallow fetch depth 

518 Returns: count and iterator over pack data 

519 """ 

520 missing_objects = self.find_missing_objects( 

521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

522 ) 

523 if missing_objects is None: 

524 return 0, iter([]) 

525 remote_has = missing_objects.get_remote_has() 

526 object_ids = list(missing_objects) 

527 return len(object_ids), generate_unpacked_objects( 

528 self.object_store, object_ids, progress=progress, other_haves=remote_has 

529 ) 

530 

531 def find_missing_objects( 

532 self, 

533 determine_wants, 

534 graph_walker, 

535 progress, 

536 *, 

537 get_tagged=None, 

538 depth: Optional[int] = None, 

539 ) -> Optional[MissingObjectFinder]: 

540 """Fetch the missing objects required for a set of revisions. 

541 

542 Args: 

543 determine_wants: Function that takes a dictionary with heads 

544 and returns the list of heads to fetch. 

545 graph_walker: Object that can iterate over the list of revisions 

546 to fetch and has an "ack" method that will be called to acknowledge 

547 that a revision is present. 

548 progress: Simple progress function that will be called with 

549 updated progress strings. 

550 get_tagged: Function that returns a dict of pointed-to sha -> 

551 tag sha for including tags. 

552 depth: Shallow fetch depth 

553 Returns: iterator over objects, with __len__ implemented 

554 """ 

555 refs = serialize_refs(self.object_store, self.get_refs()) 

556 

557 wants = determine_wants(refs) 

558 if not isinstance(wants, list): 

559 raise TypeError("determine_wants() did not return a list") 

560 

561 current_shallow = set(getattr(graph_walker, "shallow", set())) 

562 

563 if depth not in (None, 0): 

564 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

565 # Only update if graph_walker has shallow attribute 

566 if hasattr(graph_walker, "shallow"): 

567 graph_walker.shallow.update(shallow - not_shallow) 

568 new_shallow = graph_walker.shallow - current_shallow 

569 unshallow = graph_walker.unshallow = not_shallow & current_shallow 

570 if hasattr(graph_walker, "update_shallow"): 

571 graph_walker.update_shallow(new_shallow, unshallow) 

572 else: 

573 unshallow = getattr(graph_walker, "unshallow", frozenset()) 

574 

575 if wants == []: 

576 # TODO(dborowitz): find a way to short-circuit that doesn't change 

577 # this interface. 

578 

579 if getattr(graph_walker, "shallow", set()) or unshallow: 

580 # Do not send a pack in shallow short-circuit path 

581 return None 

582 

583 class DummyMissingObjectFinder: 

584 def get_remote_has(self) -> None: 

585 return None 

586 

587 def __len__(self) -> int: 

588 return 0 

589 

590 def __iter__(self): 

591 yield from [] 

592 

593 return DummyMissingObjectFinder() # type: ignore 

594 

595 # If the graph walker is set up with an implementation that can 

596 # ACK/NAK to the wire, it will write data to the client through 

597 # this call as a side-effect. 

598 haves = self.object_store.find_common_revisions(graph_walker) 

599 

600 # Deal with shallow requests separately because the haves do 

601 # not reflect what objects are missing 

602 if getattr(graph_walker, "shallow", set()) or unshallow: 

603 # TODO: filter the haves commits from iter_shas. the specific 

604 # commits aren't missing. 

605 haves = [] 

606 

607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

608 

609 def get_parents(commit): 

610 return parents_provider.get_parents(commit.id, commit) 

611 

612 return MissingObjectFinder( 

613 self.object_store, 

614 haves=haves, 

615 wants=wants, 

616 shallow=getattr(graph_walker, "shallow", set()), 

617 progress=progress, 

618 get_tagged=get_tagged, 

619 get_parents=get_parents, 

620 ) 

621 

622 def generate_pack_data( 

623 self, 

624 have: list[ObjectID], 

625 want: list[ObjectID], 

626 progress: Optional[Callable[[str], None]] = None, 

627 ofs_delta: Optional[bool] = None, 

628 ): 

629 """Generate pack data objects for a set of wants/haves. 

630 

631 Args: 

632 have: List of SHA1s of objects that should not be sent 

633 want: List of SHA1s of objects that should be sent 

634 ofs_delta: Whether OFS deltas can be included 

635 progress: Optional progress reporting method 

636 """ 

637 return self.object_store.generate_pack_data( 

638 have, 

639 want, 

640 shallow=self.get_shallow(), 

641 progress=progress, 

642 ofs_delta=ofs_delta, 

643 ) 

644 

645 def get_graph_walker( 

646 self, heads: Optional[list[ObjectID]] = None 

647 ) -> ObjectStoreGraphWalker: 

648 """Retrieve a graph walker. 

649 

650 A graph walker is used by a remote repository (or proxy) 

651 to find out which objects are present in this repository. 

652 

653 Args: 

654 heads: Repository heads to use (optional) 

655 Returns: A graph walker object 

656 """ 

657 if heads is None: 

658 heads = [ 

659 sha 

660 for sha in self.refs.as_dict(b"refs/heads").values() 

661 if sha in self.object_store 

662 ] 

663 parents_provider = ParentsProvider(self.object_store) 

664 return ObjectStoreGraphWalker( 

665 heads, 

666 parents_provider.get_parents, 

667 shallow=self.get_shallow(), 

668 update_shallow=self.update_shallow, 

669 ) 

670 

671 def get_refs(self) -> dict[bytes, bytes]: 

672 """Get dictionary with all refs. 

673 

674 Returns: A ``dict`` mapping ref names to SHA1s 

675 """ 

676 return self.refs.as_dict() 

677 

678 def head(self) -> bytes: 

679 """Return the SHA1 pointed at by HEAD.""" 

680 return self.refs[b"HEAD"] 

681 

682 def _get_object(self, sha, cls): 

683 assert len(sha) in (20, 40) 

684 ret = self.get_object(sha) 

685 if not isinstance(ret, cls): 

686 if cls is Commit: 

687 raise NotCommitError(ret) 

688 elif cls is Blob: 

689 raise NotBlobError(ret) 

690 elif cls is Tree: 

691 raise NotTreeError(ret) 

692 elif cls is Tag: 

693 raise NotTagError(ret) 

694 else: 

695 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

696 return ret 

697 

698 def get_object(self, sha: bytes) -> ShaFile: 

699 """Retrieve the object with the specified SHA. 

700 

701 Args: 

702 sha: SHA to retrieve 

703 Returns: A ShaFile object 

704 Raises: 

705 KeyError: when the object can not be found 

706 """ 

707 return self.object_store[sha] 

708 

709 def parents_provider(self) -> ParentsProvider: 

710 return ParentsProvider( 

711 self.object_store, 

712 grafts=self._graftpoints, 

713 shallows=self.get_shallow(), 

714 ) 

715 

716 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

717 """Retrieve the parents of a specific commit. 

718 

719 If the specific commit is a graftpoint, the graft parents 

720 will be returned instead. 

721 

722 Args: 

723 sha: SHA of the commit for which to retrieve the parents 

724 commit: Optional commit matching the sha 

725 Returns: List of parents 

726 """ 

727 return self.parents_provider().get_parents(sha, commit) 

728 

729 def get_config(self) -> "ConfigFile": 

730 """Retrieve the config object. 

731 

732 Returns: `ConfigFile` object for the ``.git/config`` file. 

733 """ 

734 raise NotImplementedError(self.get_config) 

735 

736 def get_worktree_config(self) -> "ConfigFile": 

737 """Retrieve the worktree config object.""" 

738 raise NotImplementedError(self.get_worktree_config) 

739 

740 def get_description(self) -> Optional[str]: 

741 """Retrieve the description for this repository. 

742 

743 Returns: String with the description of the repository 

744 as set by the user. 

745 """ 

746 raise NotImplementedError(self.get_description) 

747 

748 def set_description(self, description) -> None: 

749 """Set the description for this repository. 

750 

751 Args: 

752 description: Text to set as description for this repository. 

753 """ 

754 raise NotImplementedError(self.set_description) 

755 

756 def get_rebase_state_manager(self): 

757 """Get the appropriate rebase state manager for this repository. 

758 

759 Returns: RebaseStateManager instance 

760 """ 

761 raise NotImplementedError(self.get_rebase_state_manager) 

762 

763 def get_config_stack(self) -> "StackedConfig": 

764 """Return a config stack for this repository. 

765 

766 This stack accesses the configuration for both this repository 

767 itself (.git/config) and the global configuration, which usually 

768 lives in ~/.gitconfig. 

769 

770 Returns: `Config` instance for this repository 

771 """ 

772 from .config import ConfigFile, StackedConfig 

773 

774 local_config = self.get_config() 

775 backends: list[ConfigFile] = [local_config] 

776 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

777 backends.append(self.get_worktree_config()) 

778 

779 backends += StackedConfig.default_backends() 

780 return StackedConfig(backends, writable=local_config) 

781 

782 def get_shallow(self) -> set[ObjectID]: 

783 """Get the set of shallow commits. 

784 

785 Returns: Set of shallow commits. 

786 """ 

787 f = self.get_named_file("shallow") 

788 if f is None: 

789 return set() 

790 with f: 

791 return {line.strip() for line in f} 

792 

793 def update_shallow(self, new_shallow, new_unshallow) -> None: 

794 """Update the list of shallow objects. 

795 

796 Args: 

797 new_shallow: Newly shallow objects 

798 new_unshallow: Newly no longer shallow objects 

799 """ 

800 shallow = self.get_shallow() 

801 if new_shallow: 

802 shallow.update(new_shallow) 

803 if new_unshallow: 

804 shallow.difference_update(new_unshallow) 

805 if shallow: 

806 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

807 else: 

808 self._del_named_file("shallow") 

809 

810 def get_peeled(self, ref: Ref) -> ObjectID: 

811 """Get the peeled value of a ref. 

812 

813 Args: 

814 ref: The refname to peel. 

815 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

816 intermediate tags; if the original ref does not point to a tag, 

817 this will equal the original SHA1. 

818 """ 

819 cached = self.refs.get_peeled(ref) 

820 if cached is not None: 

821 return cached 

822 return peel_sha(self.object_store, self.refs[ref])[1].id 

823 

824 @property 

825 def notes(self) -> "Notes": 

826 """Access notes functionality for this repository. 

827 

828 Returns: 

829 Notes object for accessing notes 

830 """ 

831 from .notes import Notes 

832 

833 return Notes(self.object_store, self.refs) 

834 

835 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs): 

836 """Obtain a walker for this repository. 

837 

838 Args: 

839 include: Iterable of SHAs of commits to include along with their 

840 ancestors. Defaults to [HEAD] 

841 

842 Keyword Args: 

843 exclude: Iterable of SHAs of commits to exclude along with their 

844 ancestors, overriding includes. 

845 order: ORDER_* constant specifying the order of results. 

846 Anything other than ORDER_DATE may result in O(n) memory usage. 

847 reverse: If True, reverse the order of output, requiring O(n) 

848 memory. 

849 max_entries: The maximum number of entries to yield, or None for 

850 no limit. 

851 paths: Iterable of file or subtree paths to show entries for. 

852 rename_detector: diff.RenameDetector object for detecting 

853 renames. 

854 follow: If True, follow path across renames/copies. Forces a 

855 default rename_detector. 

856 since: Timestamp to list commits after. 

857 until: Timestamp to list commits before. 

858 queue_cls: A class to use for a queue of commits, supporting the 

859 iterator protocol. The constructor takes a single argument, the 

860 Walker. 

861 

862 Returns: A `Walker` object 

863 """ 

864 from .walk import Walker 

865 

866 if include is None: 

867 include = [self.head()] 

868 

869 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

870 

871 return Walker(self.object_store, include, **kwargs) 

872 

873 def __getitem__(self, name: Union[ObjectID, Ref]): 

874 """Retrieve a Git object by SHA1 or ref. 

875 

876 Args: 

877 name: A Git object SHA1 or a ref name 

878 Returns: A `ShaFile` object, such as a Commit or Blob 

879 Raises: 

880 KeyError: when the specified ref or object does not exist 

881 """ 

882 if not isinstance(name, bytes): 

883 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

884 if len(name) in (20, 40): 

885 try: 

886 return self.object_store[name] 

887 except (KeyError, ValueError): 

888 pass 

889 try: 

890 return self.object_store[self.refs[name]] 

891 except RefFormatError as exc: 

892 raise KeyError(name) from exc 

893 

894 def __contains__(self, name: bytes) -> bool: 

895 """Check if a specific Git object or ref is present. 

896 

897 Args: 

898 name: Git object SHA1 or ref name 

899 """ 

900 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

901 return name in self.object_store or name in self.refs 

902 else: 

903 return name in self.refs 

904 

905 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

906 """Set a ref. 

907 

908 Args: 

909 name: ref name 

910 value: Ref value - either a ShaFile object, or a hex sha 

911 """ 

912 if name.startswith(b"refs/") or name == b"HEAD": 

913 if isinstance(value, ShaFile): 

914 self.refs[name] = value.id 

915 elif isinstance(value, bytes): 

916 self.refs[name] = value 

917 else: 

918 raise TypeError(value) 

919 else: 

920 raise ValueError(name) 

921 

922 def __delitem__(self, name: bytes) -> None: 

923 """Remove a ref. 

924 

925 Args: 

926 name: Name of the ref to remove 

927 """ 

928 if name.startswith(b"refs/") or name == b"HEAD": 

929 del self.refs[name] 

930 else: 

931 raise ValueError(name) 

932 

933 def _get_user_identity( 

934 self, config: "StackedConfig", kind: Optional[str] = None 

935 ) -> bytes: 

936 """Determine the identity to use for new commits.""" 

937 warnings.warn( 

938 "use get_user_identity() rather than Repo._get_user_identity", 

939 DeprecationWarning, 

940 ) 

941 return get_user_identity(config) 

942 

943 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

944 """Add or modify graftpoints. 

945 

946 Args: 

947 updated_graftpoints: Dict of commit shas to list of parent shas 

948 """ 

949 # Simple validation 

950 for commit, parents in updated_graftpoints.items(): 

951 for sha in [commit, *parents]: 

952 check_hexsha(sha, "Invalid graftpoint") 

953 

954 self._graftpoints.update(updated_graftpoints) 

955 

956 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

957 """Remove graftpoints. 

958 

959 Args: 

960 to_remove: List of commit shas 

961 """ 

962 for sha in to_remove: 

963 del self._graftpoints[sha] 

964 

965 def _read_heads(self, name): 

966 f = self.get_named_file(name) 

967 if f is None: 

968 return [] 

969 with f: 

970 return [line.strip() for line in f.readlines() if line.strip()] 

971 

972 def do_commit( 

973 self, 

974 message: Optional[bytes] = None, 

975 committer: Optional[bytes] = None, 

976 author: Optional[bytes] = None, 

977 commit_timestamp=None, 

978 commit_timezone=None, 

979 author_timestamp=None, 

980 author_timezone=None, 

981 tree: Optional[ObjectID] = None, 

982 encoding: Optional[bytes] = None, 

983 ref: Ref = b"HEAD", 

984 merge_heads: Optional[list[ObjectID]] = None, 

985 no_verify: bool = False, 

986 sign: bool = False, 

987 ): 

988 """Create a new commit. 

989 

990 If not specified, committer and author default to 

991 get_user_identity(..., 'COMMITTER') 

992 and get_user_identity(..., 'AUTHOR') respectively. 

993 

994 Args: 

995 message: Commit message 

996 committer: Committer fullname 

997 author: Author fullname 

998 commit_timestamp: Commit timestamp (defaults to now) 

999 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1000 author_timestamp: Author timestamp (defaults to commit 

1001 timestamp) 

1002 author_timezone: Author timestamp timezone 

1003 (defaults to commit timestamp timezone) 

1004 tree: SHA1 of the tree root to use (if not specified the 

1005 current index will be committed). 

1006 encoding: Encoding 

1007 ref: Optional ref to commit to (defaults to current branch) 

1008 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1009 no_verify: Skip pre-commit and commit-msg hooks 

1010 sign: GPG Sign the commit (bool, defaults to False, 

1011 pass True to use default GPG key, 

1012 pass a str containing Key ID to use a specific GPG key) 

1013 

1014 Returns: 

1015 New commit SHA1 

1016 """ 

1017 try: 

1018 if not no_verify: 

1019 self.hooks["pre-commit"].execute() 

1020 except HookError as exc: 

1021 raise CommitError(exc) from exc 

1022 except KeyError: # no hook defined, silent fallthrough 

1023 pass 

1024 

1025 c = Commit() 

1026 if tree is None: 

1027 index = self.open_index() 

1028 c.tree = index.commit(self.object_store) 

1029 else: 

1030 if len(tree) != 40: 

1031 raise ValueError("tree must be a 40-byte hex sha string") 

1032 c.tree = tree 

1033 

1034 config = self.get_config_stack() 

1035 if merge_heads is None: 

1036 merge_heads = self._read_heads("MERGE_HEAD") 

1037 if committer is None: 

1038 committer = get_user_identity(config, kind="COMMITTER") 

1039 check_user_identity(committer) 

1040 c.committer = committer 

1041 if commit_timestamp is None: 

1042 # FIXME: Support GIT_COMMITTER_DATE environment variable 

1043 commit_timestamp = time.time() 

1044 c.commit_time = int(commit_timestamp) 

1045 if commit_timezone is None: 

1046 # FIXME: Use current user timezone rather than UTC 

1047 commit_timezone = 0 

1048 c.commit_timezone = commit_timezone 

1049 if author is None: 

1050 author = get_user_identity(config, kind="AUTHOR") 

1051 c.author = author 

1052 check_user_identity(author) 

1053 if author_timestamp is None: 

1054 # FIXME: Support GIT_AUTHOR_DATE environment variable 

1055 author_timestamp = commit_timestamp 

1056 c.author_time = int(author_timestamp) 

1057 if author_timezone is None: 

1058 author_timezone = commit_timezone 

1059 c.author_timezone = author_timezone 

1060 if encoding is None: 

1061 try: 

1062 encoding = config.get(("i18n",), "commitEncoding") 

1063 except KeyError: 

1064 pass # No dice 

1065 if encoding is not None: 

1066 c.encoding = encoding 

1067 if message is None: 

1068 # FIXME: Try to read commit message from .git/MERGE_MSG 

1069 raise ValueError("No commit message specified") 

1070 

1071 try: 

1072 if no_verify: 

1073 c.message = message 

1074 else: 

1075 c.message = self.hooks["commit-msg"].execute(message) 

1076 if c.message is None: 

1077 c.message = message 

1078 except HookError as exc: 

1079 raise CommitError(exc) from exc 

1080 except KeyError: # no hook defined, message not modified 

1081 c.message = message 

1082 

1083 keyid = sign if isinstance(sign, str) else None 

1084 

1085 if ref is None: 

1086 # Create a dangling commit 

1087 c.parents = merge_heads 

1088 if sign: 

1089 c.sign(keyid) 

1090 self.object_store.add_object(c) 

1091 else: 

1092 try: 

1093 old_head = self.refs[ref] 

1094 c.parents = [old_head, *merge_heads] 

1095 if sign: 

1096 c.sign(keyid) 

1097 self.object_store.add_object(c) 

1098 ok = self.refs.set_if_equals( 

1099 ref, 

1100 old_head, 

1101 c.id, 

1102 message=b"commit: " + message, 

1103 committer=committer, 

1104 timestamp=commit_timestamp, 

1105 timezone=commit_timezone, 

1106 ) 

1107 except KeyError: 

1108 c.parents = merge_heads 

1109 if sign: 

1110 c.sign(keyid) 

1111 self.object_store.add_object(c) 

1112 ok = self.refs.add_if_new( 

1113 ref, 

1114 c.id, 

1115 message=b"commit: " + message, 

1116 committer=committer, 

1117 timestamp=commit_timestamp, 

1118 timezone=commit_timezone, 

1119 ) 

1120 if not ok: 

1121 # Fail if the atomic compare-and-swap failed, leaving the 

1122 # commit and all its objects as garbage. 

1123 raise CommitError(f"{ref!r} changed during commit") 

1124 

1125 self._del_named_file("MERGE_HEAD") 

1126 

1127 try: 

1128 self.hooks["post-commit"].execute() 

1129 except HookError as e: # silent failure 

1130 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

1131 except KeyError: # no hook defined, silent fallthrough 

1132 pass 

1133 

1134 # Trigger auto GC if needed 

1135 from .gc import maybe_auto_gc 

1136 

1137 maybe_auto_gc(self) 

1138 

1139 return c.id 

1140 

1141 

1142def read_gitfile(f): 

1143 """Read a ``.git`` file. 

1144 

1145 The first line of the file should start with "gitdir: " 

1146 

1147 Args: 

1148 f: File-like object to read from 

1149 Returns: A path 

1150 """ 

1151 cs = f.read() 

1152 if not cs.startswith("gitdir: "): 

1153 raise ValueError("Expected file to start with 'gitdir: '") 

1154 return cs[len("gitdir: ") :].rstrip("\n") 

1155 

1156 

1157class UnsupportedVersion(Exception): 

1158 """Unsupported repository version.""" 

1159 

1160 def __init__(self, version) -> None: 

1161 self.version = version 

1162 

1163 

1164class UnsupportedExtension(Exception): 

1165 """Unsupported repository extension.""" 

1166 

1167 def __init__(self, extension) -> None: 

1168 self.extension = extension 

1169 

1170 

1171class Repo(BaseRepo): 

1172 """A git repository backed by local disk. 

1173 

1174 To open an existing repository, call the constructor with 

1175 the path of the repository. 

1176 

1177 To create a new repository, use the Repo.init class method. 

1178 

1179 Note that a repository object may hold on to resources such 

1180 as file handles for performance reasons; call .close() to free 

1181 up those resources. 

1182 

1183 Attributes: 

1184 path: Path to the working copy (if it exists) or repository control 

1185 directory (if the repository is bare) 

1186 bare: Whether this is a bare repository 

1187 """ 

1188 

1189 path: str 

1190 bare: bool 

1191 

1192 def __init__( 

1193 self, 

1194 root: Union[str, bytes, os.PathLike], 

1195 object_store: Optional[PackBasedObjectStore] = None, 

1196 bare: Optional[bool] = None, 

1197 ) -> None: 

1198 """Open a repository on disk. 

1199 

1200 Args: 

1201 root: Path to the repository's root. 

1202 object_store: ObjectStore to use; if omitted, we use the 

1203 repository's default object store 

1204 bare: True if this is a bare repository. 

1205 """ 

1206 root = os.fspath(root) 

1207 if isinstance(root, bytes): 

1208 root = os.fsdecode(root) 

1209 hidden_path = os.path.join(root, CONTROLDIR) 

1210 if bare is None: 

1211 if os.path.isfile(hidden_path) or os.path.isdir( 

1212 os.path.join(hidden_path, OBJECTDIR) 

1213 ): 

1214 bare = False 

1215 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1216 os.path.join(root, REFSDIR) 

1217 ): 

1218 bare = True 

1219 else: 

1220 raise NotGitRepository( 

1221 "No git repository was found at {path}".format(**dict(path=root)) 

1222 ) 

1223 

1224 self.bare = bare 

1225 if bare is False: 

1226 if os.path.isfile(hidden_path): 

1227 with open(hidden_path) as f: 

1228 path = read_gitfile(f) 

1229 self._controldir = os.path.join(root, path) 

1230 else: 

1231 self._controldir = hidden_path 

1232 else: 

1233 self._controldir = root 

1234 commondir = self.get_named_file(COMMONDIR) 

1235 if commondir is not None: 

1236 with commondir: 

1237 self._commondir = os.path.join( 

1238 self.controldir(), 

1239 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1240 ) 

1241 else: 

1242 self._commondir = self._controldir 

1243 self.path = root 

1244 

1245 # Initialize refs early so they're available for config condition matchers 

1246 self.refs = DiskRefsContainer( 

1247 self.commondir(), self._controldir, logger=self._write_reflog 

1248 ) 

1249 

1250 config = self.get_config() 

1251 try: 

1252 repository_format_version = config.get("core", "repositoryformatversion") 

1253 format_version = ( 

1254 0 

1255 if repository_format_version is None 

1256 else int(repository_format_version) 

1257 ) 

1258 except KeyError: 

1259 format_version = 0 

1260 

1261 if format_version not in (0, 1): 

1262 raise UnsupportedVersion(format_version) 

1263 

1264 for extension, _value in config.items((b"extensions",)): 

1265 if extension.lower() not in (b"worktreeconfig",): 

1266 raise UnsupportedExtension(extension) 

1267 

1268 if object_store is None: 

1269 object_store = DiskObjectStore.from_config( 

1270 os.path.join(self.commondir(), OBJECTDIR), config 

1271 ) 

1272 BaseRepo.__init__(self, object_store, self.refs) 

1273 

1274 self._graftpoints = {} 

1275 graft_file = self.get_named_file( 

1276 os.path.join("info", "grafts"), basedir=self.commondir() 

1277 ) 

1278 if graft_file: 

1279 with graft_file: 

1280 self._graftpoints.update(parse_graftpoints(graft_file)) 

1281 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1282 if graft_file: 

1283 with graft_file: 

1284 self._graftpoints.update(parse_graftpoints(graft_file)) 

1285 

1286 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1287 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1288 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1289 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1290 

1291 def _write_reflog( 

1292 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1293 ) -> None: 

1294 from .reflog import format_reflog_line 

1295 

1296 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1297 try: 

1298 os.makedirs(os.path.dirname(path)) 

1299 except FileExistsError: 

1300 pass 

1301 if committer is None: 

1302 config = self.get_config_stack() 

1303 committer = get_user_identity(config) 

1304 check_user_identity(committer) 

1305 if timestamp is None: 

1306 timestamp = int(time.time()) 

1307 if timezone is None: 

1308 timezone = 0 # FIXME 

1309 with open(path, "ab") as f: 

1310 f.write( 

1311 format_reflog_line( 

1312 old_sha, new_sha, committer, timestamp, timezone, message 

1313 ) 

1314 + b"\n" 

1315 ) 

1316 

1317 @classmethod 

1318 def discover(cls, start="."): 

1319 """Iterate parent directories to discover a repository. 

1320 

1321 Return a Repo object for the first parent directory that looks like a 

1322 Git repository. 

1323 

1324 Args: 

1325 start: The directory to start discovery from (defaults to '.') 

1326 """ 

1327 remaining = True 

1328 path = os.path.abspath(start) 

1329 while remaining: 

1330 try: 

1331 return cls(path) 

1332 except NotGitRepository: 

1333 path, remaining = os.path.split(path) 

1334 raise NotGitRepository( 

1335 "No git repository was found at {path}".format(**dict(path=start)) 

1336 ) 

1337 

1338 def controldir(self): 

1339 """Return the path of the control directory.""" 

1340 return self._controldir 

1341 

1342 def commondir(self): 

1343 """Return the path of the common directory. 

1344 

1345 For a main working tree, it is identical to controldir(). 

1346 

1347 For a linked working tree, it is the control directory of the 

1348 main working tree. 

1349 """ 

1350 return self._commondir 

1351 

1352 def _determine_file_mode(self): 

1353 """Probe the file-system to determine whether permissions can be trusted. 

1354 

1355 Returns: True if permissions can be trusted, False otherwise. 

1356 """ 

1357 fname = os.path.join(self.path, ".probe-permissions") 

1358 with open(fname, "w") as f: 

1359 f.write("") 

1360 

1361 st1 = os.lstat(fname) 

1362 try: 

1363 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1364 except PermissionError: 

1365 return False 

1366 st2 = os.lstat(fname) 

1367 

1368 os.unlink(fname) 

1369 

1370 mode_differs = st1.st_mode != st2.st_mode 

1371 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1372 

1373 return mode_differs and st2_has_exec 

1374 

1375 def _determine_symlinks(self): 

1376 """Probe the filesystem to determine whether symlinks can be created. 

1377 

1378 Returns: True if symlinks can be created, False otherwise. 

1379 """ 

1380 # TODO(jelmer): Actually probe disk / look at filesystem 

1381 return sys.platform != "win32" 

1382 

1383 def _put_named_file(self, path, contents) -> None: 

1384 """Write a file to the control dir with the given name and contents. 

1385 

1386 Args: 

1387 path: The path to the file, relative to the control dir. 

1388 contents: A string to write to the file. 

1389 """ 

1390 path = path.lstrip(os.path.sep) 

1391 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1392 f.write(contents) 

1393 

1394 def _del_named_file(self, path) -> None: 

1395 try: 

1396 os.unlink(os.path.join(self.controldir(), path)) 

1397 except FileNotFoundError: 

1398 return 

1399 

1400 def get_named_file(self, path, basedir=None): 

1401 """Get a file from the control dir with a specific name. 

1402 

1403 Although the filename should be interpreted as a filename relative to 

1404 the control dir in a disk-based Repo, the object returned need not be 

1405 pointing to a file in that location. 

1406 

1407 Args: 

1408 path: The path to the file, relative to the control dir. 

1409 basedir: Optional argument that specifies an alternative to the 

1410 control dir. 

1411 Returns: An open file object, or None if the file does not exist. 

1412 """ 

1413 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1414 # the dumb web serving code. 

1415 if basedir is None: 

1416 basedir = self.controldir() 

1417 path = path.lstrip(os.path.sep) 

1418 try: 

1419 return open(os.path.join(basedir, path), "rb") 

1420 except FileNotFoundError: 

1421 return None 

1422 

1423 def index_path(self): 

1424 """Return path to the index file.""" 

1425 return os.path.join(self.controldir(), INDEX_FILENAME) 

1426 

1427 def open_index(self) -> "Index": 

1428 """Open the index for this repository. 

1429 

1430 Raises: 

1431 NoIndexPresent: If no index is present 

1432 Returns: The matching `Index` 

1433 """ 

1434 from .index import Index 

1435 

1436 if not self.has_index(): 

1437 raise NoIndexPresent 

1438 

1439 # Check for manyFiles feature configuration 

1440 config = self.get_config_stack() 

1441 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1442 skip_hash = False 

1443 index_version = None 

1444 

1445 if many_files: 

1446 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1447 try: 

1448 index_version_str = config.get(b"index", b"version") 

1449 index_version = int(index_version_str) 

1450 except KeyError: 

1451 index_version = 4 # Default to version 4 for manyFiles 

1452 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1453 else: 

1454 # Check for explicit index settings 

1455 try: 

1456 index_version_str = config.get(b"index", b"version") 

1457 index_version = int(index_version_str) 

1458 except KeyError: 

1459 index_version = None 

1460 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1461 

1462 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1463 

1464 def has_index(self) -> bool: 

1465 """Check if an index is present.""" 

1466 # Bare repos must never have index files; non-bare repos may have a 

1467 # missing index file, which is treated as empty. 

1468 return not self.bare 

1469 

1470 def stage( 

1471 self, 

1472 fs_paths: Union[ 

1473 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1474 ], 

1475 ) -> None: 

1476 """Stage a set of paths. 

1477 

1478 Args: 

1479 fs_paths: List of paths, relative to the repository path 

1480 """ 

1481 root_path_bytes = os.fsencode(self.path) 

1482 

1483 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

1484 fs_paths = [fs_paths] 

1485 fs_paths = list(fs_paths) 

1486 

1487 from .index import ( 

1488 _fs_to_tree_path, 

1489 blob_from_path_and_stat, 

1490 index_entry_from_directory, 

1491 index_entry_from_stat, 

1492 ) 

1493 

1494 index = self.open_index() 

1495 blob_normalizer = self.get_blob_normalizer() 

1496 for fs_path in fs_paths: 

1497 if not isinstance(fs_path, bytes): 

1498 fs_path = os.fsencode(fs_path) 

1499 if os.path.isabs(fs_path): 

1500 raise ValueError( 

1501 f"path {fs_path!r} should be relative to " 

1502 "repository root, not absolute" 

1503 ) 

1504 tree_path = _fs_to_tree_path(fs_path) 

1505 full_path = os.path.join(root_path_bytes, fs_path) 

1506 try: 

1507 st = os.lstat(full_path) 

1508 except OSError: 

1509 # File no longer exists 

1510 try: 

1511 del index[tree_path] 

1512 except KeyError: 

1513 pass # already removed 

1514 else: 

1515 if stat.S_ISDIR(st.st_mode): 

1516 entry = index_entry_from_directory(st, full_path) 

1517 if entry: 

1518 index[tree_path] = entry 

1519 else: 

1520 try: 

1521 del index[tree_path] 

1522 except KeyError: 

1523 pass 

1524 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

1525 try: 

1526 del index[tree_path] 

1527 except KeyError: 

1528 pass 

1529 else: 

1530 blob = blob_from_path_and_stat(full_path, st) 

1531 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

1532 self.object_store.add_object(blob) 

1533 index[tree_path] = index_entry_from_stat(st, blob.id) 

1534 index.write() 

1535 

1536 def unstage(self, fs_paths: list[str]) -> None: 

1537 """Unstage specific file in the index 

1538 Args: 

1539 fs_paths: a list of files to unstage, 

1540 relative to the repository path. 

1541 """ 

1542 from .index import IndexEntry, _fs_to_tree_path 

1543 

1544 index = self.open_index() 

1545 try: 

1546 tree_id = self[b"HEAD"].tree 

1547 except KeyError: 

1548 # no head mean no commit in the repo 

1549 for fs_path in fs_paths: 

1550 tree_path = _fs_to_tree_path(fs_path) 

1551 del index[tree_path] 

1552 index.write() 

1553 return 

1554 

1555 for fs_path in fs_paths: 

1556 tree_path = _fs_to_tree_path(fs_path) 

1557 try: 

1558 tree = self.object_store[tree_id] 

1559 assert isinstance(tree, Tree) 

1560 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path) 

1561 except KeyError: 

1562 # if tree_entry didn't exist, this file was being added, so 

1563 # remove index entry 

1564 try: 

1565 del index[tree_path] 

1566 continue 

1567 except KeyError as exc: 

1568 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

1569 

1570 st = None 

1571 try: 

1572 st = os.lstat(os.path.join(self.path, fs_path)) 

1573 except FileNotFoundError: 

1574 pass 

1575 

1576 index_entry = IndexEntry( 

1577 ctime=(self[b"HEAD"].commit_time, 0), 

1578 mtime=(self[b"HEAD"].commit_time, 0), 

1579 dev=st.st_dev if st else 0, 

1580 ino=st.st_ino if st else 0, 

1581 mode=tree_entry[0], 

1582 uid=st.st_uid if st else 0, 

1583 gid=st.st_gid if st else 0, 

1584 size=len(self[tree_entry[1]].data), 

1585 sha=tree_entry[1], 

1586 flags=0, 

1587 extended_flags=0, 

1588 ) 

1589 

1590 index[tree_path] = index_entry 

1591 index.write() 

1592 

1593 def clone( 

1594 self, 

1595 target_path, 

1596 *, 

1597 mkdir=True, 

1598 bare=False, 

1599 origin=b"origin", 

1600 checkout=None, 

1601 branch=None, 

1602 progress=None, 

1603 depth: Optional[int] = None, 

1604 symlinks=None, 

1605 ) -> "Repo": 

1606 """Clone this repository. 

1607 

1608 Args: 

1609 target_path: Target path 

1610 mkdir: Create the target directory 

1611 bare: Whether to create a bare repository 

1612 checkout: Whether or not to check-out HEAD after cloning 

1613 origin: Base name for refs in target repository 

1614 cloned from this repository 

1615 branch: Optional branch or tag to be used as HEAD in the new repository 

1616 instead of this repository's HEAD. 

1617 progress: Optional progress function 

1618 depth: Depth at which to fetch 

1619 symlinks: Symlinks setting (default to autodetect) 

1620 Returns: Created repository as `Repo` 

1621 """ 

1622 encoded_path = os.fsencode(self.path) 

1623 

1624 if mkdir: 

1625 os.mkdir(target_path) 

1626 

1627 try: 

1628 if not bare: 

1629 target = Repo.init(target_path, symlinks=symlinks) 

1630 if checkout is None: 

1631 checkout = True 

1632 else: 

1633 if checkout: 

1634 raise ValueError("checkout and bare are incompatible") 

1635 target = Repo.init_bare(target_path) 

1636 

1637 try: 

1638 target_config = target.get_config() 

1639 target_config.set((b"remote", origin), b"url", encoded_path) 

1640 target_config.set( 

1641 (b"remote", origin), 

1642 b"fetch", 

1643 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1644 ) 

1645 target_config.write_to_path() 

1646 

1647 ref_message = b"clone: from " + encoded_path 

1648 self.fetch(target, depth=depth) 

1649 target.refs.import_refs( 

1650 b"refs/remotes/" + origin, 

1651 self.refs.as_dict(b"refs/heads"), 

1652 message=ref_message, 

1653 ) 

1654 target.refs.import_refs( 

1655 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1656 ) 

1657 

1658 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1659 origin_head = head_chain[-1] if head_chain else None 

1660 if origin_sha and not origin_head: 

1661 # set detached HEAD 

1662 target.refs[b"HEAD"] = origin_sha 

1663 else: 

1664 _set_origin_head(target.refs, origin, origin_head) 

1665 head_ref = _set_default_branch( 

1666 target.refs, origin, origin_head, branch, ref_message 

1667 ) 

1668 

1669 # Update target head 

1670 if head_ref: 

1671 head = _set_head(target.refs, head_ref, ref_message) 

1672 else: 

1673 head = None 

1674 

1675 if checkout and head is not None: 

1676 target.reset_index() 

1677 except BaseException: 

1678 target.close() 

1679 raise 

1680 except BaseException: 

1681 if mkdir: 

1682 import shutil 

1683 

1684 shutil.rmtree(target_path) 

1685 raise 

1686 return target 

1687 

1688 def reset_index(self, tree: Optional[bytes] = None): 

1689 """Reset the index back to a specific tree. 

1690 

1691 Args: 

1692 tree: Tree SHA to reset to, None for current HEAD tree. 

1693 """ 

1694 from .index import ( 

1695 build_index_from_tree, 

1696 symlink, 

1697 validate_path_element_default, 

1698 validate_path_element_ntfs, 

1699 ) 

1700 

1701 if tree is None: 

1702 head = self[b"HEAD"] 

1703 if isinstance(head, Tag): 

1704 _cls, obj = head.object 

1705 head = self.get_object(obj) 

1706 tree = head.tree 

1707 config = self.get_config() 

1708 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

1709 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

1710 validate_path_element = validate_path_element_ntfs 

1711 else: 

1712 validate_path_element = validate_path_element_default 

1713 if config.get_boolean(b"core", b"symlinks", True): 

1714 symlink_fn = symlink 

1715 else: 

1716 

1717 def symlink_fn(source, target) -> None: # type: ignore 

1718 with open( 

1719 target, "w" + ("b" if isinstance(source, bytes) else "") 

1720 ) as f: 

1721 f.write(source) 

1722 

1723 blob_normalizer = self.get_blob_normalizer() 

1724 return build_index_from_tree( 

1725 self.path, 

1726 self.index_path(), 

1727 self.object_store, 

1728 tree, 

1729 honor_filemode=honor_filemode, 

1730 validate_path_element=validate_path_element, 

1731 symlink_fn=symlink_fn, 

1732 blob_normalizer=blob_normalizer, 

1733 ) 

1734 

1735 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1736 """Get condition matchers for includeIf conditions. 

1737 

1738 Returns a dict of condition prefix to matcher function. 

1739 """ 

1740 from pathlib import Path 

1741 

1742 from .config import ConditionMatcher, match_glob_pattern 

1743 

1744 # Add gitdir matchers 

1745 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1746 # Handle relative patterns (starting with ./) 

1747 if pattern.startswith("./"): 

1748 # Can't handle relative patterns without config directory context 

1749 return False 

1750 

1751 # Normalize repository path 

1752 try: 

1753 repo_path = str(Path(self._controldir).resolve()) 

1754 except (OSError, ValueError): 

1755 return False 

1756 

1757 # Expand ~ in pattern and normalize 

1758 pattern = os.path.expanduser(pattern) 

1759 

1760 # Normalize pattern following Git's rules 

1761 pattern = pattern.replace("\\", "/") 

1762 if not pattern.startswith(("~/", "./", "/", "**")): 

1763 # Check for Windows absolute path 

1764 if len(pattern) >= 2 and pattern[1] == ":": 

1765 pass 

1766 else: 

1767 pattern = "**/" + pattern 

1768 if pattern.endswith("/"): 

1769 pattern = pattern + "**" 

1770 

1771 # Use the existing _match_gitdir_pattern function 

1772 from .config import _match_gitdir_pattern 

1773 

1774 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1775 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1776 

1777 return _match_gitdir_pattern( 

1778 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1779 ) 

1780 

1781 # Add onbranch matcher 

1782 def match_onbranch(pattern: str) -> bool: 

1783 try: 

1784 # Get the current branch using refs 

1785 ref_chain, _ = self.refs.follow(b"HEAD") 

1786 head_ref = ref_chain[-1] # Get the final resolved ref 

1787 except KeyError: 

1788 pass 

1789 else: 

1790 if head_ref and head_ref.startswith(b"refs/heads/"): 

1791 # Extract branch name from ref 

1792 branch = head_ref[11:].decode("utf-8", errors="replace") 

1793 return match_glob_pattern(branch, pattern) 

1794 return False 

1795 

1796 matchers: dict[str, ConditionMatcher] = { 

1797 "onbranch:": match_onbranch, 

1798 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1799 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1800 } 

1801 

1802 return matchers 

1803 

1804 def get_worktree_config(self) -> "ConfigFile": 

1805 from .config import ConfigFile 

1806 

1807 path = os.path.join(self.commondir(), "config.worktree") 

1808 try: 

1809 # Pass condition matchers for includeIf evaluation 

1810 condition_matchers = self._get_config_condition_matchers() 

1811 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1812 except FileNotFoundError: 

1813 cf = ConfigFile() 

1814 cf.path = path 

1815 return cf 

1816 

1817 def get_config(self) -> "ConfigFile": 

1818 """Retrieve the config object. 

1819 

1820 Returns: `ConfigFile` object for the ``.git/config`` file. 

1821 """ 

1822 from .config import ConfigFile 

1823 

1824 path = os.path.join(self._commondir, "config") 

1825 try: 

1826 # Pass condition matchers for includeIf evaluation 

1827 condition_matchers = self._get_config_condition_matchers() 

1828 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1829 except FileNotFoundError: 

1830 ret = ConfigFile() 

1831 ret.path = path 

1832 return ret 

1833 

1834 def get_rebase_state_manager(self): 

1835 """Get the appropriate rebase state manager for this repository. 

1836 

1837 Returns: DiskRebaseStateManager instance 

1838 """ 

1839 import os 

1840 

1841 from .rebase import DiskRebaseStateManager 

1842 

1843 path = os.path.join(self.controldir(), "rebase-merge") 

1844 return DiskRebaseStateManager(path) 

1845 

1846 def get_description(self): 

1847 """Retrieve the description of this repository. 

1848 

1849 Returns: A string describing the repository or None. 

1850 """ 

1851 path = os.path.join(self._controldir, "description") 

1852 try: 

1853 with GitFile(path, "rb") as f: 

1854 return f.read() 

1855 except FileNotFoundError: 

1856 return None 

1857 

1858 def __repr__(self) -> str: 

1859 return f"<Repo at {self.path!r}>" 

1860 

1861 def set_description(self, description) -> None: 

1862 """Set the description for this repository. 

1863 

1864 Args: 

1865 description: Text to set as description for this repository. 

1866 """ 

1867 self._put_named_file("description", description) 

1868 

1869 @classmethod 

1870 def _init_maybe_bare( 

1871 cls, 

1872 path: Union[str, bytes, os.PathLike], 

1873 controldir: Union[str, bytes, os.PathLike], 

1874 bare, 

1875 object_store=None, 

1876 config=None, 

1877 default_branch=None, 

1878 symlinks: Optional[bool] = None, 

1879 format: Optional[int] = None, 

1880 ): 

1881 path = os.fspath(path) 

1882 if isinstance(path, bytes): 

1883 path = os.fsdecode(path) 

1884 controldir = os.fspath(controldir) 

1885 if isinstance(controldir, bytes): 

1886 controldir = os.fsdecode(controldir) 

1887 for d in BASE_DIRECTORIES: 

1888 os.mkdir(os.path.join(controldir, *d)) 

1889 if object_store is None: 

1890 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1891 ret = cls(path, bare=bare, object_store=object_store) 

1892 if default_branch is None: 

1893 if config is None: 

1894 from .config import StackedConfig 

1895 

1896 config = StackedConfig.default() 

1897 try: 

1898 default_branch = config.get("init", "defaultBranch") 

1899 except KeyError: 

1900 default_branch = DEFAULT_BRANCH 

1901 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1902 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1903 return ret 

1904 

1905 @classmethod 

1906 def init( 

1907 cls, 

1908 path: Union[str, bytes, os.PathLike], 

1909 *, 

1910 mkdir: bool = False, 

1911 config=None, 

1912 default_branch=None, 

1913 symlinks: Optional[bool] = None, 

1914 format: Optional[int] = None, 

1915 ) -> "Repo": 

1916 """Create a new repository. 

1917 

1918 Args: 

1919 path: Path in which to create the repository 

1920 mkdir: Whether to create the directory 

1921 format: Repository format version (defaults to 0) 

1922 Returns: `Repo` instance 

1923 """ 

1924 path = os.fspath(path) 

1925 if isinstance(path, bytes): 

1926 path = os.fsdecode(path) 

1927 if mkdir: 

1928 os.mkdir(path) 

1929 controldir = os.path.join(path, CONTROLDIR) 

1930 os.mkdir(controldir) 

1931 _set_filesystem_hidden(controldir) 

1932 return cls._init_maybe_bare( 

1933 path, 

1934 controldir, 

1935 False, 

1936 config=config, 

1937 default_branch=default_branch, 

1938 symlinks=symlinks, 

1939 format=format, 

1940 ) 

1941 

1942 @classmethod 

1943 def _init_new_working_directory( 

1944 cls, 

1945 path: Union[str, bytes, os.PathLike], 

1946 main_repo, 

1947 identifier=None, 

1948 mkdir=False, 

1949 ): 

1950 """Create a new working directory linked to a repository. 

1951 

1952 Args: 

1953 path: Path in which to create the working tree. 

1954 main_repo: Main repository to reference 

1955 identifier: Worktree identifier 

1956 mkdir: Whether to create the directory 

1957 Returns: `Repo` instance 

1958 """ 

1959 path = os.fspath(path) 

1960 if isinstance(path, bytes): 

1961 path = os.fsdecode(path) 

1962 if mkdir: 

1963 os.mkdir(path) 

1964 if identifier is None: 

1965 identifier = os.path.basename(path) 

1966 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) 

1967 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1968 gitdirfile = os.path.join(path, CONTROLDIR) 

1969 with open(gitdirfile, "wb") as f: 

1970 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1971 try: 

1972 os.mkdir(main_worktreesdir) 

1973 except FileExistsError: 

1974 pass 

1975 try: 

1976 os.mkdir(worktree_controldir) 

1977 except FileExistsError: 

1978 pass 

1979 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1980 f.write(os.fsencode(gitdirfile) + b"\n") 

1981 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1982 f.write(b"../..\n") 

1983 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1984 f.write(main_repo.head() + b"\n") 

1985 r = cls(path) 

1986 r.reset_index() 

1987 return r 

1988 

1989 @classmethod 

1990 def init_bare( 

1991 cls, 

1992 path: Union[str, bytes, os.PathLike], 

1993 *, 

1994 mkdir=False, 

1995 object_store=None, 

1996 config=None, 

1997 default_branch=None, 

1998 format: Optional[int] = None, 

1999 ): 

2000 """Create a new bare repository. 

2001 

2002 ``path`` should already exist and be an empty directory. 

2003 

2004 Args: 

2005 path: Path to create bare repository in 

2006 format: Repository format version (defaults to 0) 

2007 Returns: a `Repo` instance 

2008 """ 

2009 path = os.fspath(path) 

2010 if isinstance(path, bytes): 

2011 path = os.fsdecode(path) 

2012 if mkdir: 

2013 os.mkdir(path) 

2014 return cls._init_maybe_bare( 

2015 path, 

2016 path, 

2017 True, 

2018 object_store=object_store, 

2019 config=config, 

2020 default_branch=default_branch, 

2021 format=format, 

2022 ) 

2023 

2024 create = init_bare 

2025 

2026 def close(self) -> None: 

2027 """Close any files opened by this repository.""" 

2028 self.object_store.close() 

2029 

2030 def __enter__(self): 

2031 return self 

2032 

2033 def __exit__(self, exc_type, exc_val, exc_tb): 

2034 self.close() 

2035 

2036 def get_blob_normalizer(self): 

2037 """Return a BlobNormalizer object.""" 

2038 # TODO Parse the git attributes files 

2039 git_attributes = {} 

2040 config_stack = self.get_config_stack() 

2041 try: 

2042 head_sha = self.refs[b"HEAD"] 

2043 # Peel tags to get the underlying commit 

2044 _, obj = peel_sha(self.object_store, head_sha) 

2045 tree = obj.tree 

2046 return TreeBlobNormalizer( 

2047 config_stack, 

2048 git_attributes, 

2049 self.object_store, 

2050 tree, 

2051 ) 

2052 except KeyError: 

2053 return BlobNormalizer(config_stack, git_attributes) 

2054 

2055 def _sparse_checkout_file_path(self) -> str: 

2056 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2057 return os.path.join(self.controldir(), "info", "sparse-checkout") 

2058 

2059 def configure_for_cone_mode(self) -> None: 

2060 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2061 config = self.get_config() 

2062 config.set((b"core",), b"sparseCheckout", b"true") 

2063 config.set((b"core",), b"sparseCheckoutCone", b"true") 

2064 config.write_to_path() 

2065 

2066 def infer_cone_mode(self) -> bool: 

2067 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2068 config = self.get_config() 

2069 try: 

2070 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

2071 return sc_cone == b"true" 

2072 except KeyError: 

2073 # If core.sparseCheckoutCone is not set, default to False 

2074 return False 

2075 

2076 def get_sparse_checkout_patterns(self) -> list[str]: 

2077 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2078 

2079 Returns: 

2080 A list of patterns. Returns an empty list if the file is missing. 

2081 """ 

2082 path = self._sparse_checkout_file_path() 

2083 try: 

2084 with open(path, encoding="utf-8") as f: 

2085 return [line.strip() for line in f if line.strip()] 

2086 except FileNotFoundError: 

2087 return [] 

2088 

2089 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2090 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2091 

2092 Creates the info/ directory if it does not exist. 

2093 

2094 Args: 

2095 patterns: A list of gitignore-style patterns to store. 

2096 """ 

2097 info_dir = os.path.join(self.controldir(), "info") 

2098 os.makedirs(info_dir, exist_ok=True) 

2099 

2100 path = self._sparse_checkout_file_path() 

2101 with open(path, "w", encoding="utf-8") as f: 

2102 for pat in patterns: 

2103 f.write(pat + "\n") 

2104 

2105 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2106 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2107 

2108 For each directory to include, add an inclusion line that "undoes" the prior 

2109 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2110 Never add the same line twice. 

2111 """ 

2112 patterns = ["/*", "!/*/"] 

2113 if dirs: 

2114 for d in dirs: 

2115 d = d.strip("/") 

2116 line = f"/{d}/" 

2117 if d and line not in patterns: 

2118 patterns.append(line) 

2119 self.set_sparse_checkout_patterns(patterns) 

2120 

2121 

2122class MemoryRepo(BaseRepo): 

2123 """Repo that stores refs, objects, and named files in memory. 

2124 

2125 MemoryRepos are always bare: they have no working tree and no index, since 

2126 those have a stronger dependency on the filesystem. 

2127 """ 

2128 

2129 def __init__(self) -> None: 

2130 """Create a new repository in memory.""" 

2131 from .config import ConfigFile 

2132 

2133 self._reflog: list[Any] = [] 

2134 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2135 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore 

2136 self._named_files: dict[str, bytes] = {} 

2137 self.bare = True 

2138 self._config = ConfigFile() 

2139 self._description = None 

2140 

2141 def _append_reflog(self, *args) -> None: 

2142 self._reflog.append(args) 

2143 

2144 def set_description(self, description) -> None: 

2145 self._description = description 

2146 

2147 def get_description(self): 

2148 return self._description 

2149 

2150 def _determine_file_mode(self): 

2151 """Probe the file-system to determine whether permissions can be trusted. 

2152 

2153 Returns: True if permissions can be trusted, False otherwise. 

2154 """ 

2155 return sys.platform != "win32" 

2156 

2157 def _determine_symlinks(self): 

2158 """Probe the file-system to determine whether permissions can be trusted. 

2159 

2160 Returns: True if permissions can be trusted, False otherwise. 

2161 """ 

2162 return sys.platform != "win32" 

2163 

2164 def _put_named_file(self, path, contents) -> None: 

2165 """Write a file to the control dir with the given name and contents. 

2166 

2167 Args: 

2168 path: The path to the file, relative to the control dir. 

2169 contents: A string to write to the file. 

2170 """ 

2171 self._named_files[path] = contents 

2172 

2173 def _del_named_file(self, path) -> None: 

2174 try: 

2175 del self._named_files[path] 

2176 except KeyError: 

2177 pass 

2178 

2179 def get_named_file(self, path, basedir=None): 

2180 """Get a file from the control dir with a specific name. 

2181 

2182 Although the filename should be interpreted as a filename relative to 

2183 the control dir in a disk-baked Repo, the object returned need not be 

2184 pointing to a file in that location. 

2185 

2186 Args: 

2187 path: The path to the file, relative to the control dir. 

2188 Returns: An open file object, or None if the file does not exist. 

2189 """ 

2190 contents = self._named_files.get(path, None) 

2191 if contents is None: 

2192 return None 

2193 return BytesIO(contents) 

2194 

2195 def open_index(self) -> "Index": 

2196 """Fail to open index for this repo, since it is bare. 

2197 

2198 Raises: 

2199 NoIndexPresent: Raised when no index is present 

2200 """ 

2201 raise NoIndexPresent 

2202 

2203 def get_config(self): 

2204 """Retrieve the config object. 

2205 

2206 Returns: `ConfigFile` object. 

2207 """ 

2208 return self._config 

2209 

2210 def get_rebase_state_manager(self): 

2211 """Get the appropriate rebase state manager for this repository. 

2212 

2213 Returns: MemoryRebaseStateManager instance 

2214 """ 

2215 from .rebase import MemoryRebaseStateManager 

2216 

2217 return MemoryRebaseStateManager(self) 

2218 

2219 @classmethod 

2220 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2221 """Create a new bare repository in memory. 

2222 

2223 Args: 

2224 objects: Objects for the new repository, 

2225 as iterable 

2226 refs: Refs as dictionary, mapping names 

2227 to object SHA1s 

2228 format: Repository format version (defaults to 0) 

2229 """ 

2230 ret = cls() 

2231 for obj in objects: 

2232 ret.object_store.add_object(obj) 

2233 for refname, sha in refs.items(): 

2234 ret.refs.add_if_new(refname, sha) 

2235 ret._init_files(bare=True, format=format) 

2236 return ret