Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1044 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as public by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 Union, 

46) 

47 

48if TYPE_CHECKING: 

49 # There are no circular imports here, but we try to defer imports as long 

50 # as possible to reduce start-up time for anything that doesn't need 

51 # these imports. 

52 from .attrs import GitAttributes 

53 from .config import ConditionMatcher, ConfigFile, StackedConfig 

54 from .index import Index 

55 from .notes import Notes 

56 

57from .errors import ( 

58 CommitError, 

59 HookError, 

60 NoIndexPresent, 

61 NotBlobError, 

62 NotCommitError, 

63 NotGitRepository, 

64 NotTagError, 

65 NotTreeError, 

66 RefFormatError, 

67) 

68from .file import GitFile 

69from .hooks import ( 

70 CommitMsgShellHook, 

71 Hook, 

72 PostCommitShellHook, 

73 PostReceiveShellHook, 

74 PreCommitShellHook, 

75) 

76from .line_ending import BlobNormalizer, TreeBlobNormalizer 

77from .object_store import ( 

78 DiskObjectStore, 

79 MemoryObjectStore, 

80 MissingObjectFinder, 

81 ObjectStoreGraphWalker, 

82 PackBasedObjectStore, 

83 find_shallow, 

84 peel_sha, 

85) 

86from .objects import ( 

87 Blob, 

88 Commit, 

89 ObjectID, 

90 ShaFile, 

91 Tag, 

92 Tree, 

93 check_hexsha, 

94 valid_hexsha, 

95) 

96from .pack import generate_unpacked_objects 

97from .refs import ( 

98 ANNOTATED_TAG_SUFFIX, # noqa: F401 

99 LOCAL_BRANCH_PREFIX, 

100 LOCAL_TAG_PREFIX, # noqa: F401 

101 SYMREF, # noqa: F401 

102 DictRefsContainer, 

103 DiskRefsContainer, 

104 InfoRefsContainer, # noqa: F401 

105 Ref, 

106 RefsContainer, 

107 _set_default_branch, 

108 _set_head, 

109 _set_origin_head, 

110 check_ref_format, # noqa: F401 

111 read_packed_refs, # noqa: F401 

112 read_packed_refs_with_peeled, # noqa: F401 

113 serialize_refs, 

114 write_packed_refs, # noqa: F401 

115) 

116 

117CONTROLDIR = ".git" 

118OBJECTDIR = "objects" 

119REFSDIR = "refs" 

120REFSDIR_TAGS = "tags" 

121REFSDIR_HEADS = "heads" 

122INDEX_FILENAME = "index" 

123COMMONDIR = "commondir" 

124GITDIR = "gitdir" 

125WORKTREES = "worktrees" 

126 

127BASE_DIRECTORIES = [ 

128 ["branches"], 

129 [REFSDIR], 

130 [REFSDIR, REFSDIR_TAGS], 

131 [REFSDIR, REFSDIR_HEADS], 

132 ["hooks"], 

133 ["info"], 

134] 

135 

136DEFAULT_BRANCH = b"master" 

137 

138 

139class InvalidUserIdentity(Exception): 

140 """User identity is not of the format 'user <email>'.""" 

141 

142 def __init__(self, identity) -> None: 

143 self.identity = identity 

144 

145 

146class DefaultIdentityNotFound(Exception): 

147 """Default identity could not be determined.""" 

148 

149 

150# TODO(jelmer): Cache? 

151def _get_default_identity() -> tuple[str, str]: 

152 import socket 

153 

154 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

155 username = os.environ.get(name) 

156 if username: 

157 break 

158 else: 

159 username = None 

160 

161 try: 

162 import pwd 

163 except ImportError: 

164 fullname = None 

165 else: 

166 try: 

167 entry = pwd.getpwuid(os.getuid()) # type: ignore 

168 except KeyError: 

169 fullname = None 

170 else: 

171 if getattr(entry, "gecos", None): 

172 fullname = entry.pw_gecos.split(",")[0] 

173 else: 

174 fullname = None 

175 if username is None: 

176 username = entry.pw_name 

177 if not fullname: 

178 if username is None: 

179 raise DefaultIdentityNotFound("no username found") 

180 fullname = username 

181 email = os.environ.get("EMAIL") 

182 if email is None: 

183 if username is None: 

184 raise DefaultIdentityNotFound("no username found") 

185 email = f"{username}@{socket.gethostname()}" 

186 return (fullname, email) 

187 

188 

189def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

190 """Determine the identity to use for new commits. 

191 

192 If kind is set, this first checks 

193 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

194 

195 If those variables are not set, then it will fall back 

196 to reading the user.name and user.email settings from 

197 the specified configuration. 

198 

199 If that also fails, then it will fall back to using 

200 the current users' identity as obtained from the host 

201 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

202 

203 Args: 

204 kind: Optional kind to return identity for, 

205 usually either "AUTHOR" or "COMMITTER". 

206 

207 Returns: 

208 A user identity 

209 """ 

210 user: Optional[bytes] = None 

211 email: Optional[bytes] = None 

212 if kind: 

213 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

214 if user_uc is not None: 

215 user = user_uc.encode("utf-8") 

216 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

217 if email_uc is not None: 

218 email = email_uc.encode("utf-8") 

219 if user is None: 

220 try: 

221 user = config.get(("user",), "name") 

222 except KeyError: 

223 user = None 

224 if email is None: 

225 try: 

226 email = config.get(("user",), "email") 

227 except KeyError: 

228 email = None 

229 default_user, default_email = _get_default_identity() 

230 if user is None: 

231 user = default_user.encode("utf-8") 

232 if email is None: 

233 email = default_email.encode("utf-8") 

234 if email.startswith(b"<") and email.endswith(b">"): 

235 email = email[1:-1] 

236 return user + b" <" + email + b">" 

237 

238 

239def check_user_identity(identity) -> None: 

240 """Verify that a user identity is formatted correctly. 

241 

242 Args: 

243 identity: User identity bytestring 

244 Raises: 

245 InvalidUserIdentity: Raised when identity is invalid 

246 """ 

247 try: 

248 fst, snd = identity.split(b" <", 1) 

249 except ValueError as exc: 

250 raise InvalidUserIdentity(identity) from exc 

251 if b">" not in snd: 

252 raise InvalidUserIdentity(identity) 

253 if b"\0" in identity or b"\n" in identity: 

254 raise InvalidUserIdentity(identity) 

255 

256 

257def parse_graftpoints( 

258 graftpoints: Iterable[bytes], 

259) -> dict[bytes, list[bytes]]: 

260 """Convert a list of graftpoints into a dict. 

261 

262 Args: 

263 graftpoints: Iterator of graftpoint lines 

264 

265 Each line is formatted as: 

266 <commit sha1> <parent sha1> [<parent sha1>]* 

267 

268 Resulting dictionary is: 

269 <commit sha1>: [<parent sha1>*] 

270 

271 https://git.wiki.kernel.org/index.php/GraftPoint 

272 """ 

273 grafts = {} 

274 for line in graftpoints: 

275 raw_graft = line.split(None, 1) 

276 

277 commit = raw_graft[0] 

278 if len(raw_graft) == 2: 

279 parents = raw_graft[1].split() 

280 else: 

281 parents = [] 

282 

283 for sha in [commit, *parents]: 

284 check_hexsha(sha, "Invalid graftpoint") 

285 

286 grafts[commit] = parents 

287 return grafts 

288 

289 

290def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

291 """Convert a dictionary of grafts into string. 

292 

293 The graft dictionary is: 

294 <commit sha1>: [<parent sha1>*] 

295 

296 Each line is formatted as: 

297 <commit sha1> <parent sha1> [<parent sha1>]* 

298 

299 https://git.wiki.kernel.org/index.php/GraftPoint 

300 

301 """ 

302 graft_lines = [] 

303 for commit, parents in graftpoints.items(): 

304 if parents: 

305 graft_lines.append(commit + b" " + b" ".join(parents)) 

306 else: 

307 graft_lines.append(commit) 

308 return b"\n".join(graft_lines) 

309 

310 

311def _set_filesystem_hidden(path) -> None: 

312 """Mark path as to be hidden if supported by platform and filesystem. 

313 

314 On win32 uses SetFileAttributesW api: 

315 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

316 """ 

317 if sys.platform == "win32": 

318 import ctypes 

319 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

320 

321 FILE_ATTRIBUTE_HIDDEN = 2 

322 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

323 ("SetFileAttributesW", ctypes.windll.kernel32) 

324 ) 

325 

326 if isinstance(path, bytes): 

327 path = os.fsdecode(path) 

328 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

329 pass # Could raise or log `ctypes.WinError()` here 

330 

331 # Could implement other platform specific filesystem hiding here 

332 

333 

334class ParentsProvider: 

335 def __init__(self, store, grafts={}, shallows=[]) -> None: 

336 self.store = store 

337 self.grafts = grafts 

338 self.shallows = set(shallows) 

339 

340 # Get commit graph once at initialization for performance 

341 self.commit_graph = store.get_commit_graph() 

342 

343 def get_parents(self, commit_id, commit=None): 

344 try: 

345 return self.grafts[commit_id] 

346 except KeyError: 

347 pass 

348 if commit_id in self.shallows: 

349 return [] 

350 

351 # Try to use commit graph for faster parent lookup 

352 if self.commit_graph: 

353 parents = self.commit_graph.get_parents(commit_id) 

354 if parents is not None: 

355 return parents 

356 

357 # Fallback to reading the commit object 

358 if commit is None: 

359 commit = self.store[commit_id] 

360 return commit.parents 

361 

362 

363class BaseRepo: 

364 """Base class for a git repository. 

365 

366 This base class is meant to be used for Repository implementations that e.g. 

367 work on top of a different transport than a standard filesystem path. 

368 

369 Attributes: 

370 object_store: Dictionary-like object for accessing 

371 the objects 

372 refs: Dictionary-like object with the refs in this 

373 repository 

374 """ 

375 

376 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

377 """Open a repository. 

378 

379 This shouldn't be called directly, but rather through one of the 

380 base classes, such as MemoryRepo or Repo. 

381 

382 Args: 

383 object_store: Object store to use 

384 refs: Refs container to use 

385 """ 

386 self.object_store = object_store 

387 self.refs = refs 

388 

389 self._graftpoints: dict[bytes, list[bytes]] = {} 

390 self.hooks: dict[str, Hook] = {} 

391 

392 def _determine_file_mode(self) -> bool: 

393 """Probe the file-system to determine whether permissions can be trusted. 

394 

395 Returns: True if permissions can be trusted, False otherwise. 

396 """ 

397 raise NotImplementedError(self._determine_file_mode) 

398 

399 def _determine_symlinks(self) -> bool: 

400 """Probe the filesystem to determine whether symlinks can be created. 

401 

402 Returns: True if symlinks can be created, False otherwise. 

403 """ 

404 # For now, just mimic the old behaviour 

405 return sys.platform != "win32" 

406 

407 def _init_files( 

408 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

409 ) -> None: 

410 """Initialize a default set of named files.""" 

411 from .config import ConfigFile 

412 

413 self._put_named_file("description", b"Unnamed repository") 

414 f = BytesIO() 

415 cf = ConfigFile() 

416 if format is None: 

417 format = 0 

418 if format not in (0, 1): 

419 raise ValueError(f"Unsupported repository format version: {format}") 

420 cf.set("core", "repositoryformatversion", str(format)) 

421 if self._determine_file_mode(): 

422 cf.set("core", "filemode", True) 

423 else: 

424 cf.set("core", "filemode", False) 

425 

426 if symlinks is None and not bare: 

427 symlinks = self._determine_symlinks() 

428 

429 if symlinks is False: 

430 cf.set("core", "symlinks", symlinks) 

431 

432 cf.set("core", "bare", bare) 

433 cf.set("core", "logallrefupdates", True) 

434 cf.write_to_file(f) 

435 self._put_named_file("config", f.getvalue()) 

436 self._put_named_file(os.path.join("info", "exclude"), b"") 

437 

438 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

439 """Get a file from the control dir with a specific name. 

440 

441 Although the filename should be interpreted as a filename relative to 

442 the control dir in a disk-based Repo, the object returned need not be 

443 pointing to a file in that location. 

444 

445 Args: 

446 path: The path to the file, relative to the control dir. 

447 Returns: An open file object, or None if the file does not exist. 

448 """ 

449 raise NotImplementedError(self.get_named_file) 

450 

451 def _put_named_file(self, path: str, contents: bytes) -> None: 

452 """Write a file to the control dir with the given name and contents. 

453 

454 Args: 

455 path: The path to the file, relative to the control dir. 

456 contents: A string to write to the file. 

457 """ 

458 raise NotImplementedError(self._put_named_file) 

459 

460 def _del_named_file(self, path: str) -> None: 

461 """Delete a file in the control directory with the given name.""" 

462 raise NotImplementedError(self._del_named_file) 

463 

464 def open_index(self) -> "Index": 

465 """Open the index for this repository. 

466 

467 Raises: 

468 NoIndexPresent: If no index is present 

469 Returns: The matching `Index` 

470 """ 

471 raise NotImplementedError(self.open_index) 

472 

473 def fetch( 

474 self, target, determine_wants=None, progress=None, depth: Optional[int] = None 

475 ): 

476 """Fetch objects into another repository. 

477 

478 Args: 

479 target: The target repository 

480 determine_wants: Optional function to determine what refs to 

481 fetch. 

482 progress: Optional progress function 

483 depth: Optional shallow fetch depth 

484 Returns: The local refs 

485 """ 

486 if determine_wants is None: 

487 determine_wants = target.object_store.determine_wants_all 

488 count, pack_data = self.fetch_pack_data( 

489 determine_wants, 

490 target.get_graph_walker(), 

491 progress=progress, 

492 depth=depth, 

493 ) 

494 target.object_store.add_pack_data(count, pack_data, progress) 

495 return self.get_refs() 

496 

497 def fetch_pack_data( 

498 self, 

499 determine_wants, 

500 graph_walker, 

501 progress, 

502 *, 

503 get_tagged=None, 

504 depth: Optional[int] = None, 

505 ): 

506 """Fetch the pack data required for a set of revisions. 

507 

508 Args: 

509 determine_wants: Function that takes a dictionary with heads 

510 and returns the list of heads to fetch. 

511 graph_walker: Object that can iterate over the list of revisions 

512 to fetch and has an "ack" method that will be called to acknowledge 

513 that a revision is present. 

514 progress: Simple progress function that will be called with 

515 updated progress strings. 

516 get_tagged: Function that returns a dict of pointed-to sha -> 

517 tag sha for including tags. 

518 depth: Shallow fetch depth 

519 Returns: count and iterator over pack data 

520 """ 

521 missing_objects = self.find_missing_objects( 

522 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

523 ) 

524 if missing_objects is None: 

525 return 0, iter([]) 

526 remote_has = missing_objects.get_remote_has() 

527 object_ids = list(missing_objects) 

528 return len(object_ids), generate_unpacked_objects( 

529 self.object_store, object_ids, progress=progress, other_haves=remote_has 

530 ) 

531 

532 def find_missing_objects( 

533 self, 

534 determine_wants, 

535 graph_walker, 

536 progress, 

537 *, 

538 get_tagged=None, 

539 depth: Optional[int] = None, 

540 ) -> Optional[MissingObjectFinder]: 

541 """Fetch the missing objects required for a set of revisions. 

542 

543 Args: 

544 determine_wants: Function that takes a dictionary with heads 

545 and returns the list of heads to fetch. 

546 graph_walker: Object that can iterate over the list of revisions 

547 to fetch and has an "ack" method that will be called to acknowledge 

548 that a revision is present. 

549 progress: Simple progress function that will be called with 

550 updated progress strings. 

551 get_tagged: Function that returns a dict of pointed-to sha -> 

552 tag sha for including tags. 

553 depth: Shallow fetch depth 

554 Returns: iterator over objects, with __len__ implemented 

555 """ 

556 refs = serialize_refs(self.object_store, self.get_refs()) 

557 

558 wants = determine_wants(refs) 

559 if not isinstance(wants, list): 

560 raise TypeError("determine_wants() did not return a list") 

561 

562 current_shallow = set(getattr(graph_walker, "shallow", set())) 

563 

564 if depth not in (None, 0): 

565 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

566 # Only update if graph_walker has shallow attribute 

567 if hasattr(graph_walker, "shallow"): 

568 graph_walker.shallow.update(shallow - not_shallow) 

569 new_shallow = graph_walker.shallow - current_shallow 

570 unshallow = graph_walker.unshallow = not_shallow & current_shallow 

571 if hasattr(graph_walker, "update_shallow"): 

572 graph_walker.update_shallow(new_shallow, unshallow) 

573 else: 

574 unshallow = getattr(graph_walker, "unshallow", frozenset()) 

575 

576 if wants == []: 

577 # TODO(dborowitz): find a way to short-circuit that doesn't change 

578 # this interface. 

579 

580 if getattr(graph_walker, "shallow", set()) or unshallow: 

581 # Do not send a pack in shallow short-circuit path 

582 return None 

583 

584 class DummyMissingObjectFinder: 

585 def get_remote_has(self) -> None: 

586 return None 

587 

588 def __len__(self) -> int: 

589 return 0 

590 

591 def __iter__(self): 

592 yield from [] 

593 

594 return DummyMissingObjectFinder() # type: ignore 

595 

596 # If the graph walker is set up with an implementation that can 

597 # ACK/NAK to the wire, it will write data to the client through 

598 # this call as a side-effect. 

599 haves = self.object_store.find_common_revisions(graph_walker) 

600 

601 # Deal with shallow requests separately because the haves do 

602 # not reflect what objects are missing 

603 if getattr(graph_walker, "shallow", set()) or unshallow: 

604 # TODO: filter the haves commits from iter_shas. the specific 

605 # commits aren't missing. 

606 haves = [] 

607 

608 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

609 

610 def get_parents(commit): 

611 return parents_provider.get_parents(commit.id, commit) 

612 

613 return MissingObjectFinder( 

614 self.object_store, 

615 haves=haves, 

616 wants=wants, 

617 shallow=getattr(graph_walker, "shallow", set()), 

618 progress=progress, 

619 get_tagged=get_tagged, 

620 get_parents=get_parents, 

621 ) 

622 

623 def generate_pack_data( 

624 self, 

625 have: list[ObjectID], 

626 want: list[ObjectID], 

627 progress: Optional[Callable[[str], None]] = None, 

628 ofs_delta: Optional[bool] = None, 

629 ): 

630 """Generate pack data objects for a set of wants/haves. 

631 

632 Args: 

633 have: List of SHA1s of objects that should not be sent 

634 want: List of SHA1s of objects that should be sent 

635 ofs_delta: Whether OFS deltas can be included 

636 progress: Optional progress reporting method 

637 """ 

638 return self.object_store.generate_pack_data( 

639 have, 

640 want, 

641 shallow=self.get_shallow(), 

642 progress=progress, 

643 ofs_delta=ofs_delta, 

644 ) 

645 

646 def get_graph_walker( 

647 self, heads: Optional[list[ObjectID]] = None 

648 ) -> ObjectStoreGraphWalker: 

649 """Retrieve a graph walker. 

650 

651 A graph walker is used by a remote repository (or proxy) 

652 to find out which objects are present in this repository. 

653 

654 Args: 

655 heads: Repository heads to use (optional) 

656 Returns: A graph walker object 

657 """ 

658 if heads is None: 

659 heads = [ 

660 sha 

661 for sha in self.refs.as_dict(b"refs/heads").values() 

662 if sha in self.object_store 

663 ] 

664 parents_provider = ParentsProvider(self.object_store) 

665 return ObjectStoreGraphWalker( 

666 heads, 

667 parents_provider.get_parents, 

668 shallow=self.get_shallow(), 

669 update_shallow=self.update_shallow, 

670 ) 

671 

672 def get_refs(self) -> dict[bytes, bytes]: 

673 """Get dictionary with all refs. 

674 

675 Returns: A ``dict`` mapping ref names to SHA1s 

676 """ 

677 return self.refs.as_dict() 

678 

679 def head(self) -> bytes: 

680 """Return the SHA1 pointed at by HEAD.""" 

681 return self.refs[b"HEAD"] 

682 

683 def _get_object(self, sha, cls): 

684 assert len(sha) in (20, 40) 

685 ret = self.get_object(sha) 

686 if not isinstance(ret, cls): 

687 if cls is Commit: 

688 raise NotCommitError(ret) 

689 elif cls is Blob: 

690 raise NotBlobError(ret) 

691 elif cls is Tree: 

692 raise NotTreeError(ret) 

693 elif cls is Tag: 

694 raise NotTagError(ret) 

695 else: 

696 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

697 return ret 

698 

699 def get_object(self, sha: bytes) -> ShaFile: 

700 """Retrieve the object with the specified SHA. 

701 

702 Args: 

703 sha: SHA to retrieve 

704 Returns: A ShaFile object 

705 Raises: 

706 KeyError: when the object can not be found 

707 """ 

708 return self.object_store[sha] 

709 

710 def parents_provider(self) -> ParentsProvider: 

711 return ParentsProvider( 

712 self.object_store, 

713 grafts=self._graftpoints, 

714 shallows=self.get_shallow(), 

715 ) 

716 

717 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

718 """Retrieve the parents of a specific commit. 

719 

720 If the specific commit is a graftpoint, the graft parents 

721 will be returned instead. 

722 

723 Args: 

724 sha: SHA of the commit for which to retrieve the parents 

725 commit: Optional commit matching the sha 

726 Returns: List of parents 

727 """ 

728 return self.parents_provider().get_parents(sha, commit) 

729 

730 def get_config(self) -> "ConfigFile": 

731 """Retrieve the config object. 

732 

733 Returns: `ConfigFile` object for the ``.git/config`` file. 

734 """ 

735 raise NotImplementedError(self.get_config) 

736 

737 def get_worktree_config(self) -> "ConfigFile": 

738 """Retrieve the worktree config object.""" 

739 raise NotImplementedError(self.get_worktree_config) 

740 

741 def get_description(self) -> Optional[str]: 

742 """Retrieve the description for this repository. 

743 

744 Returns: String with the description of the repository 

745 as set by the user. 

746 """ 

747 raise NotImplementedError(self.get_description) 

748 

749 def set_description(self, description) -> None: 

750 """Set the description for this repository. 

751 

752 Args: 

753 description: Text to set as description for this repository. 

754 """ 

755 raise NotImplementedError(self.set_description) 

756 

757 def get_rebase_state_manager(self): 

758 """Get the appropriate rebase state manager for this repository. 

759 

760 Returns: RebaseStateManager instance 

761 """ 

762 raise NotImplementedError(self.get_rebase_state_manager) 

763 

764 def get_config_stack(self) -> "StackedConfig": 

765 """Return a config stack for this repository. 

766 

767 This stack accesses the configuration for both this repository 

768 itself (.git/config) and the global configuration, which usually 

769 lives in ~/.gitconfig. 

770 

771 Returns: `Config` instance for this repository 

772 """ 

773 from .config import ConfigFile, StackedConfig 

774 

775 local_config = self.get_config() 

776 backends: list[ConfigFile] = [local_config] 

777 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

778 backends.append(self.get_worktree_config()) 

779 

780 backends += StackedConfig.default_backends() 

781 return StackedConfig(backends, writable=local_config) 

782 

783 def get_shallow(self) -> set[ObjectID]: 

784 """Get the set of shallow commits. 

785 

786 Returns: Set of shallow commits. 

787 """ 

788 f = self.get_named_file("shallow") 

789 if f is None: 

790 return set() 

791 with f: 

792 return {line.strip() for line in f} 

793 

794 def update_shallow(self, new_shallow, new_unshallow) -> None: 

795 """Update the list of shallow objects. 

796 

797 Args: 

798 new_shallow: Newly shallow objects 

799 new_unshallow: Newly no longer shallow objects 

800 """ 

801 shallow = self.get_shallow() 

802 if new_shallow: 

803 shallow.update(new_shallow) 

804 if new_unshallow: 

805 shallow.difference_update(new_unshallow) 

806 if shallow: 

807 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

808 else: 

809 self._del_named_file("shallow") 

810 

811 def get_peeled(self, ref: Ref) -> ObjectID: 

812 """Get the peeled value of a ref. 

813 

814 Args: 

815 ref: The refname to peel. 

816 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

817 intermediate tags; if the original ref does not point to a tag, 

818 this will equal the original SHA1. 

819 """ 

820 cached = self.refs.get_peeled(ref) 

821 if cached is not None: 

822 return cached 

823 return peel_sha(self.object_store, self.refs[ref])[1].id 

824 

825 @property 

826 def notes(self) -> "Notes": 

827 """Access notes functionality for this repository. 

828 

829 Returns: 

830 Notes object for accessing notes 

831 """ 

832 from .notes import Notes 

833 

834 return Notes(self.object_store, self.refs) 

835 

836 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs): 

837 """Obtain a walker for this repository. 

838 

839 Args: 

840 include: Iterable of SHAs of commits to include along with their 

841 ancestors. Defaults to [HEAD] 

842 

843 Keyword Args: 

844 exclude: Iterable of SHAs of commits to exclude along with their 

845 ancestors, overriding includes. 

846 order: ORDER_* constant specifying the order of results. 

847 Anything other than ORDER_DATE may result in O(n) memory usage. 

848 reverse: If True, reverse the order of output, requiring O(n) 

849 memory. 

850 max_entries: The maximum number of entries to yield, or None for 

851 no limit. 

852 paths: Iterable of file or subtree paths to show entries for. 

853 rename_detector: diff.RenameDetector object for detecting 

854 renames. 

855 follow: If True, follow path across renames/copies. Forces a 

856 default rename_detector. 

857 since: Timestamp to list commits after. 

858 until: Timestamp to list commits before. 

859 queue_cls: A class to use for a queue of commits, supporting the 

860 iterator protocol. The constructor takes a single argument, the 

861 Walker. 

862 

863 Returns: A `Walker` object 

864 """ 

865 from .walk import Walker 

866 

867 if include is None: 

868 include = [self.head()] 

869 

870 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

871 

872 return Walker(self.object_store, include, **kwargs) 

873 

874 def __getitem__(self, name: Union[ObjectID, Ref]): 

875 """Retrieve a Git object by SHA1 or ref. 

876 

877 Args: 

878 name: A Git object SHA1 or a ref name 

879 Returns: A `ShaFile` object, such as a Commit or Blob 

880 Raises: 

881 KeyError: when the specified ref or object does not exist 

882 """ 

883 if not isinstance(name, bytes): 

884 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

885 if len(name) in (20, 40): 

886 try: 

887 return self.object_store[name] 

888 except (KeyError, ValueError): 

889 pass 

890 try: 

891 return self.object_store[self.refs[name]] 

892 except RefFormatError as exc: 

893 raise KeyError(name) from exc 

894 

895 def __contains__(self, name: bytes) -> bool: 

896 """Check if a specific Git object or ref is present. 

897 

898 Args: 

899 name: Git object SHA1 or ref name 

900 """ 

901 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

902 return name in self.object_store or name in self.refs 

903 else: 

904 return name in self.refs 

905 

906 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

907 """Set a ref. 

908 

909 Args: 

910 name: ref name 

911 value: Ref value - either a ShaFile object, or a hex sha 

912 """ 

913 if name.startswith(b"refs/") or name == b"HEAD": 

914 if isinstance(value, ShaFile): 

915 self.refs[name] = value.id 

916 elif isinstance(value, bytes): 

917 self.refs[name] = value 

918 else: 

919 raise TypeError(value) 

920 else: 

921 raise ValueError(name) 

922 

923 def __delitem__(self, name: bytes) -> None: 

924 """Remove a ref. 

925 

926 Args: 

927 name: Name of the ref to remove 

928 """ 

929 if name.startswith(b"refs/") or name == b"HEAD": 

930 del self.refs[name] 

931 else: 

932 raise ValueError(name) 

933 

934 def _get_user_identity( 

935 self, config: "StackedConfig", kind: Optional[str] = None 

936 ) -> bytes: 

937 """Determine the identity to use for new commits.""" 

938 warnings.warn( 

939 "use get_user_identity() rather than Repo._get_user_identity", 

940 DeprecationWarning, 

941 ) 

942 return get_user_identity(config) 

943 

944 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

945 """Add or modify graftpoints. 

946 

947 Args: 

948 updated_graftpoints: Dict of commit shas to list of parent shas 

949 """ 

950 # Simple validation 

951 for commit, parents in updated_graftpoints.items(): 

952 for sha in [commit, *parents]: 

953 check_hexsha(sha, "Invalid graftpoint") 

954 

955 self._graftpoints.update(updated_graftpoints) 

956 

957 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

958 """Remove graftpoints. 

959 

960 Args: 

961 to_remove: List of commit shas 

962 """ 

963 for sha in to_remove: 

964 del self._graftpoints[sha] 

965 

966 def _read_heads(self, name): 

967 f = self.get_named_file(name) 

968 if f is None: 

969 return [] 

970 with f: 

971 return [line.strip() for line in f.readlines() if line.strip()] 

972 

973 def do_commit( 

974 self, 

975 message: Optional[bytes] = None, 

976 committer: Optional[bytes] = None, 

977 author: Optional[bytes] = None, 

978 commit_timestamp=None, 

979 commit_timezone=None, 

980 author_timestamp=None, 

981 author_timezone=None, 

982 tree: Optional[ObjectID] = None, 

983 encoding: Optional[bytes] = None, 

984 ref: Optional[Ref] = b"HEAD", 

985 merge_heads: Optional[list[ObjectID]] = None, 

986 no_verify: bool = False, 

987 sign: bool = False, 

988 ): 

989 """Create a new commit. 

990 

991 If not specified, committer and author default to 

992 get_user_identity(..., 'COMMITTER') 

993 and get_user_identity(..., 'AUTHOR') respectively. 

994 

995 Args: 

996 message: Commit message 

997 committer: Committer fullname 

998 author: Author fullname 

999 commit_timestamp: Commit timestamp (defaults to now) 

1000 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1001 author_timestamp: Author timestamp (defaults to commit 

1002 timestamp) 

1003 author_timezone: Author timestamp timezone 

1004 (defaults to commit timestamp timezone) 

1005 tree: SHA1 of the tree root to use (if not specified the 

1006 current index will be committed). 

1007 encoding: Encoding 

1008 ref: Optional ref to commit to (defaults to current branch). 

1009 If None, creates a dangling commit without updating any ref. 

1010 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1011 no_verify: Skip pre-commit and commit-msg hooks 

1012 sign: GPG Sign the commit (bool, defaults to False, 

1013 pass True to use default GPG key, 

1014 pass a str containing Key ID to use a specific GPG key) 

1015 

1016 Returns: 

1017 New commit SHA1 

1018 """ 

1019 try: 

1020 if not no_verify: 

1021 self.hooks["pre-commit"].execute() 

1022 except HookError as exc: 

1023 raise CommitError(exc) from exc 

1024 except KeyError: # no hook defined, silent fallthrough 

1025 pass 

1026 

1027 c = Commit() 

1028 if tree is None: 

1029 index = self.open_index() 

1030 c.tree = index.commit(self.object_store) 

1031 else: 

1032 if len(tree) != 40: 

1033 raise ValueError("tree must be a 40-byte hex sha string") 

1034 c.tree = tree 

1035 

1036 config = self.get_config_stack() 

1037 if merge_heads is None: 

1038 merge_heads = self._read_heads("MERGE_HEAD") 

1039 if committer is None: 

1040 committer = get_user_identity(config, kind="COMMITTER") 

1041 check_user_identity(committer) 

1042 c.committer = committer 

1043 if commit_timestamp is None: 

1044 # FIXME: Support GIT_COMMITTER_DATE environment variable 

1045 commit_timestamp = time.time() 

1046 c.commit_time = int(commit_timestamp) 

1047 if commit_timezone is None: 

1048 # FIXME: Use current user timezone rather than UTC 

1049 commit_timezone = 0 

1050 c.commit_timezone = commit_timezone 

1051 if author is None: 

1052 author = get_user_identity(config, kind="AUTHOR") 

1053 c.author = author 

1054 check_user_identity(author) 

1055 if author_timestamp is None: 

1056 # FIXME: Support GIT_AUTHOR_DATE environment variable 

1057 author_timestamp = commit_timestamp 

1058 c.author_time = int(author_timestamp) 

1059 if author_timezone is None: 

1060 author_timezone = commit_timezone 

1061 c.author_timezone = author_timezone 

1062 if encoding is None: 

1063 try: 

1064 encoding = config.get(("i18n",), "commitEncoding") 

1065 except KeyError: 

1066 pass # No dice 

1067 if encoding is not None: 

1068 c.encoding = encoding 

1069 if message is None: 

1070 # FIXME: Try to read commit message from .git/MERGE_MSG 

1071 raise ValueError("No commit message specified") 

1072 

1073 try: 

1074 if no_verify: 

1075 c.message = message 

1076 else: 

1077 c.message = self.hooks["commit-msg"].execute(message) 

1078 if c.message is None: 

1079 c.message = message 

1080 except HookError as exc: 

1081 raise CommitError(exc) from exc 

1082 except KeyError: # no hook defined, message not modified 

1083 c.message = message 

1084 

1085 # Check if we should sign the commit 

1086 should_sign = sign 

1087 if sign is None: 

1088 # Check commit.gpgSign configuration when sign is not explicitly set 

1089 config = self.get_config_stack() 

1090 try: 

1091 should_sign = config.get_boolean((b"commit",), b"gpgSign") 

1092 except KeyError: 

1093 should_sign = False # Default to not signing if no config 

1094 keyid = sign if isinstance(sign, str) else None 

1095 

1096 if ref is None: 

1097 # Create a dangling commit 

1098 c.parents = merge_heads 

1099 if should_sign: 

1100 c.sign(keyid) 

1101 self.object_store.add_object(c) 

1102 else: 

1103 try: 

1104 old_head = self.refs[ref] 

1105 c.parents = [old_head, *merge_heads] 

1106 if should_sign: 

1107 c.sign(keyid) 

1108 self.object_store.add_object(c) 

1109 ok = self.refs.set_if_equals( 

1110 ref, 

1111 old_head, 

1112 c.id, 

1113 message=b"commit: " + message, 

1114 committer=committer, 

1115 timestamp=commit_timestamp, 

1116 timezone=commit_timezone, 

1117 ) 

1118 except KeyError: 

1119 c.parents = merge_heads 

1120 if should_sign: 

1121 c.sign(keyid) 

1122 self.object_store.add_object(c) 

1123 ok = self.refs.add_if_new( 

1124 ref, 

1125 c.id, 

1126 message=b"commit: " + message, 

1127 committer=committer, 

1128 timestamp=commit_timestamp, 

1129 timezone=commit_timezone, 

1130 ) 

1131 if not ok: 

1132 # Fail if the atomic compare-and-swap failed, leaving the 

1133 # commit and all its objects as garbage. 

1134 raise CommitError(f"{ref!r} changed during commit") 

1135 

1136 self._del_named_file("MERGE_HEAD") 

1137 

1138 try: 

1139 self.hooks["post-commit"].execute() 

1140 except HookError as e: # silent failure 

1141 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

1142 except KeyError: # no hook defined, silent fallthrough 

1143 pass 

1144 

1145 # Trigger auto GC if needed 

1146 from .gc import maybe_auto_gc 

1147 

1148 maybe_auto_gc(self) 

1149 

1150 return c.id 

1151 

1152 

1153def read_gitfile(f): 

1154 """Read a ``.git`` file. 

1155 

1156 The first line of the file should start with "gitdir: " 

1157 

1158 Args: 

1159 f: File-like object to read from 

1160 Returns: A path 

1161 """ 

1162 cs = f.read() 

1163 if not cs.startswith("gitdir: "): 

1164 raise ValueError("Expected file to start with 'gitdir: '") 

1165 return cs[len("gitdir: ") :].rstrip("\n") 

1166 

1167 

1168class UnsupportedVersion(Exception): 

1169 """Unsupported repository version.""" 

1170 

1171 def __init__(self, version) -> None: 

1172 self.version = version 

1173 

1174 

1175class UnsupportedExtension(Exception): 

1176 """Unsupported repository extension.""" 

1177 

1178 def __init__(self, extension) -> None: 

1179 self.extension = extension 

1180 

1181 

1182class Repo(BaseRepo): 

1183 """A git repository backed by local disk. 

1184 

1185 To open an existing repository, call the constructor with 

1186 the path of the repository. 

1187 

1188 To create a new repository, use the Repo.init class method. 

1189 

1190 Note that a repository object may hold on to resources such 

1191 as file handles for performance reasons; call .close() to free 

1192 up those resources. 

1193 

1194 Attributes: 

1195 path: Path to the working copy (if it exists) or repository control 

1196 directory (if the repository is bare) 

1197 bare: Whether this is a bare repository 

1198 """ 

1199 

1200 path: str 

1201 bare: bool 

1202 

1203 def __init__( 

1204 self, 

1205 root: Union[str, bytes, os.PathLike], 

1206 object_store: Optional[PackBasedObjectStore] = None, 

1207 bare: Optional[bool] = None, 

1208 ) -> None: 

1209 """Open a repository on disk. 

1210 

1211 Args: 

1212 root: Path to the repository's root. 

1213 object_store: ObjectStore to use; if omitted, we use the 

1214 repository's default object store 

1215 bare: True if this is a bare repository. 

1216 """ 

1217 root = os.fspath(root) 

1218 if isinstance(root, bytes): 

1219 root = os.fsdecode(root) 

1220 hidden_path = os.path.join(root, CONTROLDIR) 

1221 if bare is None: 

1222 if os.path.isfile(hidden_path) or os.path.isdir( 

1223 os.path.join(hidden_path, OBJECTDIR) 

1224 ): 

1225 bare = False 

1226 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1227 os.path.join(root, REFSDIR) 

1228 ): 

1229 bare = True 

1230 else: 

1231 raise NotGitRepository( 

1232 "No git repository was found at {path}".format(**dict(path=root)) 

1233 ) 

1234 

1235 self.bare = bare 

1236 if bare is False: 

1237 if os.path.isfile(hidden_path): 

1238 with open(hidden_path) as f: 

1239 path = read_gitfile(f) 

1240 self._controldir = os.path.join(root, path) 

1241 else: 

1242 self._controldir = hidden_path 

1243 else: 

1244 self._controldir = root 

1245 commondir = self.get_named_file(COMMONDIR) 

1246 if commondir is not None: 

1247 with commondir: 

1248 self._commondir = os.path.join( 

1249 self.controldir(), 

1250 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1251 ) 

1252 else: 

1253 self._commondir = self._controldir 

1254 self.path = root 

1255 

1256 # Initialize refs early so they're available for config condition matchers 

1257 self.refs = DiskRefsContainer( 

1258 self.commondir(), self._controldir, logger=self._write_reflog 

1259 ) 

1260 

1261 config = self.get_config() 

1262 try: 

1263 repository_format_version = config.get("core", "repositoryformatversion") 

1264 format_version = ( 

1265 0 

1266 if repository_format_version is None 

1267 else int(repository_format_version) 

1268 ) 

1269 except KeyError: 

1270 format_version = 0 

1271 

1272 if format_version not in (0, 1): 

1273 raise UnsupportedVersion(format_version) 

1274 

1275 # Track extensions we encounter 

1276 has_reftable_extension = False 

1277 for extension, value in config.items((b"extensions",)): 

1278 if extension.lower() == b"refstorage": 

1279 if value == b"reftable": 

1280 has_reftable_extension = True 

1281 else: 

1282 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1283 elif extension.lower() not in (b"worktreeconfig",): 

1284 raise UnsupportedExtension(extension) 

1285 

1286 if object_store is None: 

1287 object_store = DiskObjectStore.from_config( 

1288 os.path.join(self.commondir(), OBJECTDIR), config 

1289 ) 

1290 

1291 # Use reftable if extension is configured 

1292 if has_reftable_extension: 

1293 from .reftable import ReftableRefsContainer 

1294 

1295 self.refs = ReftableRefsContainer(self.commondir()) 

1296 BaseRepo.__init__(self, object_store, self.refs) 

1297 

1298 self._graftpoints = {} 

1299 graft_file = self.get_named_file( 

1300 os.path.join("info", "grafts"), basedir=self.commondir() 

1301 ) 

1302 if graft_file: 

1303 with graft_file: 

1304 self._graftpoints.update(parse_graftpoints(graft_file)) 

1305 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1306 if graft_file: 

1307 with graft_file: 

1308 self._graftpoints.update(parse_graftpoints(graft_file)) 

1309 

1310 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1311 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1312 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1313 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1314 

1315 def _write_reflog( 

1316 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1317 ) -> None: 

1318 from .reflog import format_reflog_line 

1319 

1320 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1321 try: 

1322 os.makedirs(os.path.dirname(path)) 

1323 except FileExistsError: 

1324 pass 

1325 if committer is None: 

1326 config = self.get_config_stack() 

1327 committer = get_user_identity(config) 

1328 check_user_identity(committer) 

1329 if timestamp is None: 

1330 timestamp = int(time.time()) 

1331 if timezone is None: 

1332 timezone = 0 # FIXME 

1333 with open(path, "ab") as f: 

1334 f.write( 

1335 format_reflog_line( 

1336 old_sha, new_sha, committer, timestamp, timezone, message 

1337 ) 

1338 + b"\n" 

1339 ) 

1340 

1341 def read_reflog(self, ref): 

1342 """Read reflog entries for a reference. 

1343 

1344 Args: 

1345 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1346 

1347 Yields: 

1348 reflog.Entry objects in chronological order (oldest first) 

1349 """ 

1350 from .reflog import read_reflog 

1351 

1352 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1353 try: 

1354 with open(path, "rb") as f: 

1355 yield from read_reflog(f) 

1356 except FileNotFoundError: 

1357 return 

1358 

1359 @classmethod 

1360 def discover(cls, start="."): 

1361 """Iterate parent directories to discover a repository. 

1362 

1363 Return a Repo object for the first parent directory that looks like a 

1364 Git repository. 

1365 

1366 Args: 

1367 start: The directory to start discovery from (defaults to '.') 

1368 """ 

1369 remaining = True 

1370 path = os.path.abspath(start) 

1371 while remaining: 

1372 try: 

1373 return cls(path) 

1374 except NotGitRepository: 

1375 path, remaining = os.path.split(path) 

1376 raise NotGitRepository( 

1377 "No git repository was found at {path}".format(**dict(path=start)) 

1378 ) 

1379 

1380 def controldir(self): 

1381 """Return the path of the control directory.""" 

1382 return self._controldir 

1383 

1384 def commondir(self): 

1385 """Return the path of the common directory. 

1386 

1387 For a main working tree, it is identical to controldir(). 

1388 

1389 For a linked working tree, it is the control directory of the 

1390 main working tree. 

1391 """ 

1392 return self._commondir 

1393 

1394 def _determine_file_mode(self): 

1395 """Probe the file-system to determine whether permissions can be trusted. 

1396 

1397 Returns: True if permissions can be trusted, False otherwise. 

1398 """ 

1399 fname = os.path.join(self.path, ".probe-permissions") 

1400 with open(fname, "w") as f: 

1401 f.write("") 

1402 

1403 st1 = os.lstat(fname) 

1404 try: 

1405 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1406 except PermissionError: 

1407 return False 

1408 st2 = os.lstat(fname) 

1409 

1410 os.unlink(fname) 

1411 

1412 mode_differs = st1.st_mode != st2.st_mode 

1413 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1414 

1415 return mode_differs and st2_has_exec 

1416 

1417 def _determine_symlinks(self): 

1418 """Probe the filesystem to determine whether symlinks can be created. 

1419 

1420 Returns: True if symlinks can be created, False otherwise. 

1421 """ 

1422 # TODO(jelmer): Actually probe disk / look at filesystem 

1423 return sys.platform != "win32" 

1424 

1425 def _put_named_file(self, path, contents) -> None: 

1426 """Write a file to the control dir with the given name and contents. 

1427 

1428 Args: 

1429 path: The path to the file, relative to the control dir. 

1430 contents: A string to write to the file. 

1431 """ 

1432 path = path.lstrip(os.path.sep) 

1433 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1434 f.write(contents) 

1435 

1436 def _del_named_file(self, path) -> None: 

1437 try: 

1438 os.unlink(os.path.join(self.controldir(), path)) 

1439 except FileNotFoundError: 

1440 return 

1441 

1442 def get_named_file(self, path, basedir=None): 

1443 """Get a file from the control dir with a specific name. 

1444 

1445 Although the filename should be interpreted as a filename relative to 

1446 the control dir in a disk-based Repo, the object returned need not be 

1447 pointing to a file in that location. 

1448 

1449 Args: 

1450 path: The path to the file, relative to the control dir. 

1451 basedir: Optional argument that specifies an alternative to the 

1452 control dir. 

1453 Returns: An open file object, or None if the file does not exist. 

1454 """ 

1455 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1456 # the dumb web serving code. 

1457 if basedir is None: 

1458 basedir = self.controldir() 

1459 path = path.lstrip(os.path.sep) 

1460 try: 

1461 return open(os.path.join(basedir, path), "rb") 

1462 except FileNotFoundError: 

1463 return None 

1464 

1465 def index_path(self): 

1466 """Return path to the index file.""" 

1467 return os.path.join(self.controldir(), INDEX_FILENAME) 

1468 

1469 def open_index(self) -> "Index": 

1470 """Open the index for this repository. 

1471 

1472 Raises: 

1473 NoIndexPresent: If no index is present 

1474 Returns: The matching `Index` 

1475 """ 

1476 from .index import Index 

1477 

1478 if not self.has_index(): 

1479 raise NoIndexPresent 

1480 

1481 # Check for manyFiles feature configuration 

1482 config = self.get_config_stack() 

1483 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1484 skip_hash = False 

1485 index_version = None 

1486 

1487 if many_files: 

1488 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1489 try: 

1490 index_version_str = config.get(b"index", b"version") 

1491 index_version = int(index_version_str) 

1492 except KeyError: 

1493 index_version = 4 # Default to version 4 for manyFiles 

1494 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1495 else: 

1496 # Check for explicit index settings 

1497 try: 

1498 index_version_str = config.get(b"index", b"version") 

1499 index_version = int(index_version_str) 

1500 except KeyError: 

1501 index_version = None 

1502 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1503 

1504 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1505 

1506 def has_index(self) -> bool: 

1507 """Check if an index is present.""" 

1508 # Bare repos must never have index files; non-bare repos may have a 

1509 # missing index file, which is treated as empty. 

1510 return not self.bare 

1511 

1512 def stage( 

1513 self, 

1514 fs_paths: Union[ 

1515 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1516 ], 

1517 ) -> None: 

1518 """Stage a set of paths. 

1519 

1520 Args: 

1521 fs_paths: List of paths, relative to the repository path 

1522 """ 

1523 root_path_bytes = os.fsencode(self.path) 

1524 

1525 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

1526 fs_paths = [fs_paths] 

1527 fs_paths = list(fs_paths) 

1528 

1529 from .index import ( 

1530 _fs_to_tree_path, 

1531 blob_from_path_and_stat, 

1532 index_entry_from_directory, 

1533 index_entry_from_stat, 

1534 ) 

1535 

1536 index = self.open_index() 

1537 blob_normalizer = self.get_blob_normalizer() 

1538 for fs_path in fs_paths: 

1539 if not isinstance(fs_path, bytes): 

1540 fs_path = os.fsencode(fs_path) 

1541 if os.path.isabs(fs_path): 

1542 raise ValueError( 

1543 f"path {fs_path!r} should be relative to " 

1544 "repository root, not absolute" 

1545 ) 

1546 tree_path = _fs_to_tree_path(fs_path) 

1547 full_path = os.path.join(root_path_bytes, fs_path) 

1548 try: 

1549 st = os.lstat(full_path) 

1550 except OSError: 

1551 # File no longer exists 

1552 try: 

1553 del index[tree_path] 

1554 except KeyError: 

1555 pass # already removed 

1556 else: 

1557 if stat.S_ISDIR(st.st_mode): 

1558 entry = index_entry_from_directory(st, full_path) 

1559 if entry: 

1560 index[tree_path] = entry 

1561 else: 

1562 try: 

1563 del index[tree_path] 

1564 except KeyError: 

1565 pass 

1566 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

1567 try: 

1568 del index[tree_path] 

1569 except KeyError: 

1570 pass 

1571 else: 

1572 blob = blob_from_path_and_stat(full_path, st) 

1573 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

1574 self.object_store.add_object(blob) 

1575 index[tree_path] = index_entry_from_stat(st, blob.id) 

1576 index.write() 

1577 

1578 def unstage(self, fs_paths: list[str]) -> None: 

1579 """Unstage specific file in the index 

1580 Args: 

1581 fs_paths: a list of files to unstage, 

1582 relative to the repository path. 

1583 """ 

1584 from .index import IndexEntry, _fs_to_tree_path 

1585 

1586 index = self.open_index() 

1587 try: 

1588 tree_id = self[b"HEAD"].tree 

1589 except KeyError: 

1590 # no head mean no commit in the repo 

1591 for fs_path in fs_paths: 

1592 tree_path = _fs_to_tree_path(fs_path) 

1593 del index[tree_path] 

1594 index.write() 

1595 return 

1596 

1597 for fs_path in fs_paths: 

1598 tree_path = _fs_to_tree_path(fs_path) 

1599 try: 

1600 tree = self.object_store[tree_id] 

1601 assert isinstance(tree, Tree) 

1602 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path) 

1603 except KeyError: 

1604 # if tree_entry didn't exist, this file was being added, so 

1605 # remove index entry 

1606 try: 

1607 del index[tree_path] 

1608 continue 

1609 except KeyError as exc: 

1610 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

1611 

1612 st = None 

1613 try: 

1614 st = os.lstat(os.path.join(self.path, fs_path)) 

1615 except FileNotFoundError: 

1616 pass 

1617 

1618 index_entry = IndexEntry( 

1619 ctime=(self[b"HEAD"].commit_time, 0), 

1620 mtime=(self[b"HEAD"].commit_time, 0), 

1621 dev=st.st_dev if st else 0, 

1622 ino=st.st_ino if st else 0, 

1623 mode=tree_entry[0], 

1624 uid=st.st_uid if st else 0, 

1625 gid=st.st_gid if st else 0, 

1626 size=len(self[tree_entry[1]].data), 

1627 sha=tree_entry[1], 

1628 flags=0, 

1629 extended_flags=0, 

1630 ) 

1631 

1632 index[tree_path] = index_entry 

1633 index.write() 

1634 

1635 def clone( 

1636 self, 

1637 target_path, 

1638 *, 

1639 mkdir=True, 

1640 bare=False, 

1641 origin=b"origin", 

1642 checkout=None, 

1643 branch=None, 

1644 progress=None, 

1645 depth: Optional[int] = None, 

1646 symlinks=None, 

1647 ) -> "Repo": 

1648 """Clone this repository. 

1649 

1650 Args: 

1651 target_path: Target path 

1652 mkdir: Create the target directory 

1653 bare: Whether to create a bare repository 

1654 checkout: Whether or not to check-out HEAD after cloning 

1655 origin: Base name for refs in target repository 

1656 cloned from this repository 

1657 branch: Optional branch or tag to be used as HEAD in the new repository 

1658 instead of this repository's HEAD. 

1659 progress: Optional progress function 

1660 depth: Depth at which to fetch 

1661 symlinks: Symlinks setting (default to autodetect) 

1662 Returns: Created repository as `Repo` 

1663 """ 

1664 encoded_path = os.fsencode(self.path) 

1665 

1666 if mkdir: 

1667 os.mkdir(target_path) 

1668 

1669 try: 

1670 if not bare: 

1671 target = Repo.init(target_path, symlinks=symlinks) 

1672 if checkout is None: 

1673 checkout = True 

1674 else: 

1675 if checkout: 

1676 raise ValueError("checkout and bare are incompatible") 

1677 target = Repo.init_bare(target_path) 

1678 

1679 try: 

1680 target_config = target.get_config() 

1681 target_config.set((b"remote", origin), b"url", encoded_path) 

1682 target_config.set( 

1683 (b"remote", origin), 

1684 b"fetch", 

1685 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1686 ) 

1687 target_config.write_to_path() 

1688 

1689 ref_message = b"clone: from " + encoded_path 

1690 self.fetch(target, depth=depth) 

1691 target.refs.import_refs( 

1692 b"refs/remotes/" + origin, 

1693 self.refs.as_dict(b"refs/heads"), 

1694 message=ref_message, 

1695 ) 

1696 target.refs.import_refs( 

1697 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1698 ) 

1699 

1700 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1701 origin_head = head_chain[-1] if head_chain else None 

1702 if origin_sha and not origin_head: 

1703 # set detached HEAD 

1704 target.refs[b"HEAD"] = origin_sha 

1705 else: 

1706 _set_origin_head(target.refs, origin, origin_head) 

1707 head_ref = _set_default_branch( 

1708 target.refs, origin, origin_head, branch, ref_message 

1709 ) 

1710 

1711 # Update target head 

1712 if head_ref: 

1713 head = _set_head(target.refs, head_ref, ref_message) 

1714 else: 

1715 head = None 

1716 

1717 if checkout and head is not None: 

1718 target.reset_index() 

1719 except BaseException: 

1720 target.close() 

1721 raise 

1722 except BaseException: 

1723 if mkdir: 

1724 import shutil 

1725 

1726 shutil.rmtree(target_path) 

1727 raise 

1728 return target 

1729 

1730 def reset_index(self, tree: Optional[bytes] = None): 

1731 """Reset the index back to a specific tree. 

1732 

1733 Args: 

1734 tree: Tree SHA to reset to, None for current HEAD tree. 

1735 """ 

1736 from .index import ( 

1737 build_index_from_tree, 

1738 symlink, 

1739 validate_path_element_default, 

1740 validate_path_element_hfs, 

1741 validate_path_element_ntfs, 

1742 ) 

1743 

1744 if tree is None: 

1745 head = self[b"HEAD"] 

1746 if isinstance(head, Tag): 

1747 _cls, obj = head.object 

1748 head = self.get_object(obj) 

1749 tree = head.tree 

1750 config = self.get_config() 

1751 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

1752 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

1753 validate_path_element = validate_path_element_ntfs 

1754 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

1755 validate_path_element = validate_path_element_hfs 

1756 else: 

1757 validate_path_element = validate_path_element_default 

1758 if config.get_boolean(b"core", b"symlinks", True): 

1759 symlink_fn = symlink 

1760 else: 

1761 

1762 def symlink_fn(source, target) -> None: # type: ignore 

1763 with open( 

1764 target, "w" + ("b" if isinstance(source, bytes) else "") 

1765 ) as f: 

1766 f.write(source) 

1767 

1768 blob_normalizer = self.get_blob_normalizer() 

1769 return build_index_from_tree( 

1770 self.path, 

1771 self.index_path(), 

1772 self.object_store, 

1773 tree, 

1774 honor_filemode=honor_filemode, 

1775 validate_path_element=validate_path_element, 

1776 symlink_fn=symlink_fn, 

1777 blob_normalizer=blob_normalizer, 

1778 ) 

1779 

1780 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1781 """Get condition matchers for includeIf conditions. 

1782 

1783 Returns a dict of condition prefix to matcher function. 

1784 """ 

1785 from pathlib import Path 

1786 

1787 from .config import ConditionMatcher, match_glob_pattern 

1788 

1789 # Add gitdir matchers 

1790 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1791 # Handle relative patterns (starting with ./) 

1792 if pattern.startswith("./"): 

1793 # Can't handle relative patterns without config directory context 

1794 return False 

1795 

1796 # Normalize repository path 

1797 try: 

1798 repo_path = str(Path(self._controldir).resolve()) 

1799 except (OSError, ValueError): 

1800 return False 

1801 

1802 # Expand ~ in pattern and normalize 

1803 pattern = os.path.expanduser(pattern) 

1804 

1805 # Normalize pattern following Git's rules 

1806 pattern = pattern.replace("\\", "/") 

1807 if not pattern.startswith(("~/", "./", "/", "**")): 

1808 # Check for Windows absolute path 

1809 if len(pattern) >= 2 and pattern[1] == ":": 

1810 pass 

1811 else: 

1812 pattern = "**/" + pattern 

1813 if pattern.endswith("/"): 

1814 pattern = pattern + "**" 

1815 

1816 # Use the existing _match_gitdir_pattern function 

1817 from .config import _match_gitdir_pattern 

1818 

1819 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1820 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1821 

1822 return _match_gitdir_pattern( 

1823 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1824 ) 

1825 

1826 # Add onbranch matcher 

1827 def match_onbranch(pattern: str) -> bool: 

1828 try: 

1829 # Get the current branch using refs 

1830 ref_chain, _ = self.refs.follow(b"HEAD") 

1831 head_ref = ref_chain[-1] # Get the final resolved ref 

1832 except KeyError: 

1833 pass 

1834 else: 

1835 if head_ref and head_ref.startswith(b"refs/heads/"): 

1836 # Extract branch name from ref 

1837 branch = head_ref[11:].decode("utf-8", errors="replace") 

1838 return match_glob_pattern(branch, pattern) 

1839 return False 

1840 

1841 matchers: dict[str, ConditionMatcher] = { 

1842 "onbranch:": match_onbranch, 

1843 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1844 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1845 } 

1846 

1847 return matchers 

1848 

1849 def get_worktree_config(self) -> "ConfigFile": 

1850 from .config import ConfigFile 

1851 

1852 path = os.path.join(self.commondir(), "config.worktree") 

1853 try: 

1854 # Pass condition matchers for includeIf evaluation 

1855 condition_matchers = self._get_config_condition_matchers() 

1856 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1857 except FileNotFoundError: 

1858 cf = ConfigFile() 

1859 cf.path = path 

1860 return cf 

1861 

1862 def get_config(self) -> "ConfigFile": 

1863 """Retrieve the config object. 

1864 

1865 Returns: `ConfigFile` object for the ``.git/config`` file. 

1866 """ 

1867 from .config import ConfigFile 

1868 

1869 path = os.path.join(self._commondir, "config") 

1870 try: 

1871 # Pass condition matchers for includeIf evaluation 

1872 condition_matchers = self._get_config_condition_matchers() 

1873 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1874 except FileNotFoundError: 

1875 ret = ConfigFile() 

1876 ret.path = path 

1877 return ret 

1878 

1879 def get_rebase_state_manager(self): 

1880 """Get the appropriate rebase state manager for this repository. 

1881 

1882 Returns: DiskRebaseStateManager instance 

1883 """ 

1884 import os 

1885 

1886 from .rebase import DiskRebaseStateManager 

1887 

1888 path = os.path.join(self.controldir(), "rebase-merge") 

1889 return DiskRebaseStateManager(path) 

1890 

1891 def get_description(self): 

1892 """Retrieve the description of this repository. 

1893 

1894 Returns: A string describing the repository or None. 

1895 """ 

1896 path = os.path.join(self._controldir, "description") 

1897 try: 

1898 with GitFile(path, "rb") as f: 

1899 return f.read() 

1900 except FileNotFoundError: 

1901 return None 

1902 

1903 def __repr__(self) -> str: 

1904 return f"<Repo at {self.path!r}>" 

1905 

1906 def set_description(self, description) -> None: 

1907 """Set the description for this repository. 

1908 

1909 Args: 

1910 description: Text to set as description for this repository. 

1911 """ 

1912 self._put_named_file("description", description) 

1913 

1914 @classmethod 

1915 def _init_maybe_bare( 

1916 cls, 

1917 path: Union[str, bytes, os.PathLike], 

1918 controldir: Union[str, bytes, os.PathLike], 

1919 bare, 

1920 object_store=None, 

1921 config=None, 

1922 default_branch=None, 

1923 symlinks: Optional[bool] = None, 

1924 format: Optional[int] = None, 

1925 ): 

1926 path = os.fspath(path) 

1927 if isinstance(path, bytes): 

1928 path = os.fsdecode(path) 

1929 controldir = os.fspath(controldir) 

1930 if isinstance(controldir, bytes): 

1931 controldir = os.fsdecode(controldir) 

1932 for d in BASE_DIRECTORIES: 

1933 os.mkdir(os.path.join(controldir, *d)) 

1934 if object_store is None: 

1935 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1936 ret = cls(path, bare=bare, object_store=object_store) 

1937 if default_branch is None: 

1938 if config is None: 

1939 from .config import StackedConfig 

1940 

1941 config = StackedConfig.default() 

1942 try: 

1943 default_branch = config.get("init", "defaultBranch") 

1944 except KeyError: 

1945 default_branch = DEFAULT_BRANCH 

1946 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1947 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1948 return ret 

1949 

1950 @classmethod 

1951 def init( 

1952 cls, 

1953 path: Union[str, bytes, os.PathLike], 

1954 *, 

1955 mkdir: bool = False, 

1956 config=None, 

1957 default_branch=None, 

1958 symlinks: Optional[bool] = None, 

1959 format: Optional[int] = None, 

1960 ) -> "Repo": 

1961 """Create a new repository. 

1962 

1963 Args: 

1964 path: Path in which to create the repository 

1965 mkdir: Whether to create the directory 

1966 format: Repository format version (defaults to 0) 

1967 Returns: `Repo` instance 

1968 """ 

1969 path = os.fspath(path) 

1970 if isinstance(path, bytes): 

1971 path = os.fsdecode(path) 

1972 if mkdir: 

1973 os.mkdir(path) 

1974 controldir = os.path.join(path, CONTROLDIR) 

1975 os.mkdir(controldir) 

1976 _set_filesystem_hidden(controldir) 

1977 return cls._init_maybe_bare( 

1978 path, 

1979 controldir, 

1980 False, 

1981 config=config, 

1982 default_branch=default_branch, 

1983 symlinks=symlinks, 

1984 format=format, 

1985 ) 

1986 

1987 @classmethod 

1988 def _init_new_working_directory( 

1989 cls, 

1990 path: Union[str, bytes, os.PathLike], 

1991 main_repo, 

1992 identifier=None, 

1993 mkdir=False, 

1994 ): 

1995 """Create a new working directory linked to a repository. 

1996 

1997 Args: 

1998 path: Path in which to create the working tree. 

1999 main_repo: Main repository to reference 

2000 identifier: Worktree identifier 

2001 mkdir: Whether to create the directory 

2002 Returns: `Repo` instance 

2003 """ 

2004 path = os.fspath(path) 

2005 if isinstance(path, bytes): 

2006 path = os.fsdecode(path) 

2007 if mkdir: 

2008 os.mkdir(path) 

2009 if identifier is None: 

2010 identifier = os.path.basename(path) 

2011 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) 

2012 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

2013 gitdirfile = os.path.join(path, CONTROLDIR) 

2014 with open(gitdirfile, "wb") as f: 

2015 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

2016 try: 

2017 os.mkdir(main_worktreesdir) 

2018 except FileExistsError: 

2019 pass 

2020 try: 

2021 os.mkdir(worktree_controldir) 

2022 except FileExistsError: 

2023 pass 

2024 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

2025 f.write(os.fsencode(gitdirfile) + b"\n") 

2026 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

2027 f.write(b"../..\n") 

2028 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

2029 f.write(main_repo.head() + b"\n") 

2030 r = cls(path) 

2031 r.reset_index() 

2032 return r 

2033 

2034 @classmethod 

2035 def init_bare( 

2036 cls, 

2037 path: Union[str, bytes, os.PathLike], 

2038 *, 

2039 mkdir=False, 

2040 object_store=None, 

2041 config=None, 

2042 default_branch=None, 

2043 format: Optional[int] = None, 

2044 ): 

2045 """Create a new bare repository. 

2046 

2047 ``path`` should already exist and be an empty directory. 

2048 

2049 Args: 

2050 path: Path to create bare repository in 

2051 format: Repository format version (defaults to 0) 

2052 Returns: a `Repo` instance 

2053 """ 

2054 path = os.fspath(path) 

2055 if isinstance(path, bytes): 

2056 path = os.fsdecode(path) 

2057 if mkdir: 

2058 os.mkdir(path) 

2059 return cls._init_maybe_bare( 

2060 path, 

2061 path, 

2062 True, 

2063 object_store=object_store, 

2064 config=config, 

2065 default_branch=default_branch, 

2066 format=format, 

2067 ) 

2068 

2069 create = init_bare 

2070 

2071 def close(self) -> None: 

2072 """Close any files opened by this repository.""" 

2073 self.object_store.close() 

2074 

2075 def __enter__(self): 

2076 return self 

2077 

2078 def __exit__(self, exc_type, exc_val, exc_tb): 

2079 self.close() 

2080 

2081 def get_blob_normalizer(self): 

2082 """Return a BlobNormalizer object.""" 

2083 # TODO Parse the git attributes files 

2084 git_attributes = {} 

2085 config_stack = self.get_config_stack() 

2086 try: 

2087 head_sha = self.refs[b"HEAD"] 

2088 # Peel tags to get the underlying commit 

2089 _, obj = peel_sha(self.object_store, head_sha) 

2090 tree = obj.tree 

2091 return TreeBlobNormalizer( 

2092 config_stack, 

2093 git_attributes, 

2094 self.object_store, 

2095 tree, 

2096 ) 

2097 except KeyError: 

2098 return BlobNormalizer(config_stack, git_attributes) 

2099 

2100 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2101 """Read gitattributes for the repository. 

2102 

2103 Args: 

2104 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2105 

2106 Returns: 

2107 GitAttributes object that can be used to match paths 

2108 """ 

2109 from .attrs import ( 

2110 GitAttributes, 

2111 Pattern, 

2112 parse_git_attributes, 

2113 ) 

2114 

2115 patterns = [] 

2116 

2117 # Read system gitattributes (TODO: implement this) 

2118 # Read global gitattributes (TODO: implement this) 

2119 

2120 # Read repository .gitattributes from index/tree 

2121 if tree is None: 

2122 try: 

2123 # Try to get from HEAD 

2124 head = self[b"HEAD"] 

2125 if isinstance(head, Tag): 

2126 _cls, obj = head.object 

2127 head = self.get_object(obj) 

2128 tree = head.tree 

2129 except KeyError: 

2130 # No HEAD, no attributes from tree 

2131 pass 

2132 

2133 if tree is not None: 

2134 try: 

2135 tree_obj = self[tree] 

2136 if b".gitattributes" in tree_obj: 

2137 _, attrs_sha = tree_obj[b".gitattributes"] 

2138 attrs_blob = self[attrs_sha] 

2139 if isinstance(attrs_blob, Blob): 

2140 attrs_data = BytesIO(attrs_blob.data) 

2141 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2142 pattern = Pattern(pattern_bytes) 

2143 patterns.append((pattern, attrs)) 

2144 except (KeyError, NotTreeError): 

2145 pass 

2146 

2147 # Read .git/info/attributes 

2148 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2149 if os.path.exists(info_attrs_path): 

2150 with open(info_attrs_path, "rb") as f: 

2151 for pattern_bytes, attrs in parse_git_attributes(f): 

2152 pattern = Pattern(pattern_bytes) 

2153 patterns.append((pattern, attrs)) 

2154 

2155 return GitAttributes(patterns) 

2156 

2157 def _sparse_checkout_file_path(self) -> str: 

2158 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2159 return os.path.join(self.controldir(), "info", "sparse-checkout") 

2160 

2161 def configure_for_cone_mode(self) -> None: 

2162 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2163 config = self.get_config() 

2164 config.set((b"core",), b"sparseCheckout", b"true") 

2165 config.set((b"core",), b"sparseCheckoutCone", b"true") 

2166 config.write_to_path() 

2167 

2168 def infer_cone_mode(self) -> bool: 

2169 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2170 config = self.get_config() 

2171 try: 

2172 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

2173 return sc_cone == b"true" 

2174 except KeyError: 

2175 # If core.sparseCheckoutCone is not set, default to False 

2176 return False 

2177 

2178 def get_sparse_checkout_patterns(self) -> list[str]: 

2179 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2180 

2181 Returns: 

2182 A list of patterns. Returns an empty list if the file is missing. 

2183 """ 

2184 path = self._sparse_checkout_file_path() 

2185 try: 

2186 with open(path, encoding="utf-8") as f: 

2187 return [line.strip() for line in f if line.strip()] 

2188 except FileNotFoundError: 

2189 return [] 

2190 

2191 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2192 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2193 

2194 Creates the info/ directory if it does not exist. 

2195 

2196 Args: 

2197 patterns: A list of gitignore-style patterns to store. 

2198 """ 

2199 info_dir = os.path.join(self.controldir(), "info") 

2200 os.makedirs(info_dir, exist_ok=True) 

2201 

2202 path = self._sparse_checkout_file_path() 

2203 with open(path, "w", encoding="utf-8") as f: 

2204 for pat in patterns: 

2205 f.write(pat + "\n") 

2206 

2207 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2208 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2209 

2210 For each directory to include, add an inclusion line that "undoes" the prior 

2211 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2212 Never add the same line twice. 

2213 """ 

2214 patterns = ["/*", "!/*/"] 

2215 if dirs: 

2216 for d in dirs: 

2217 d = d.strip("/") 

2218 line = f"/{d}/" 

2219 if d and line not in patterns: 

2220 patterns.append(line) 

2221 self.set_sparse_checkout_patterns(patterns) 

2222 

2223 

2224class MemoryRepo(BaseRepo): 

2225 """Repo that stores refs, objects, and named files in memory. 

2226 

2227 MemoryRepos are always bare: they have no working tree and no index, since 

2228 those have a stronger dependency on the filesystem. 

2229 """ 

2230 

2231 def __init__(self) -> None: 

2232 """Create a new repository in memory.""" 

2233 from .config import ConfigFile 

2234 

2235 self._reflog: list[Any] = [] 

2236 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2237 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore 

2238 self._named_files: dict[str, bytes] = {} 

2239 self.bare = True 

2240 self._config = ConfigFile() 

2241 self._description = None 

2242 

2243 def _append_reflog(self, *args) -> None: 

2244 self._reflog.append(args) 

2245 

2246 def set_description(self, description) -> None: 

2247 self._description = description 

2248 

2249 def get_description(self): 

2250 return self._description 

2251 

2252 def _determine_file_mode(self): 

2253 """Probe the file-system to determine whether permissions can be trusted. 

2254 

2255 Returns: True if permissions can be trusted, False otherwise. 

2256 """ 

2257 return sys.platform != "win32" 

2258 

2259 def _determine_symlinks(self): 

2260 """Probe the file-system to determine whether permissions can be trusted. 

2261 

2262 Returns: True if permissions can be trusted, False otherwise. 

2263 """ 

2264 return sys.platform != "win32" 

2265 

2266 def _put_named_file(self, path, contents) -> None: 

2267 """Write a file to the control dir with the given name and contents. 

2268 

2269 Args: 

2270 path: The path to the file, relative to the control dir. 

2271 contents: A string to write to the file. 

2272 """ 

2273 self._named_files[path] = contents 

2274 

2275 def _del_named_file(self, path) -> None: 

2276 try: 

2277 del self._named_files[path] 

2278 except KeyError: 

2279 pass 

2280 

2281 def get_named_file(self, path, basedir=None): 

2282 """Get a file from the control dir with a specific name. 

2283 

2284 Although the filename should be interpreted as a filename relative to 

2285 the control dir in a disk-baked Repo, the object returned need not be 

2286 pointing to a file in that location. 

2287 

2288 Args: 

2289 path: The path to the file, relative to the control dir. 

2290 Returns: An open file object, or None if the file does not exist. 

2291 """ 

2292 contents = self._named_files.get(path, None) 

2293 if contents is None: 

2294 return None 

2295 return BytesIO(contents) 

2296 

2297 def open_index(self) -> "Index": 

2298 """Fail to open index for this repo, since it is bare. 

2299 

2300 Raises: 

2301 NoIndexPresent: Raised when no index is present 

2302 """ 

2303 raise NoIndexPresent 

2304 

2305 def get_config(self): 

2306 """Retrieve the config object. 

2307 

2308 Returns: `ConfigFile` object. 

2309 """ 

2310 return self._config 

2311 

2312 def get_rebase_state_manager(self): 

2313 """Get the appropriate rebase state manager for this repository. 

2314 

2315 Returns: MemoryRebaseStateManager instance 

2316 """ 

2317 from .rebase import MemoryRebaseStateManager 

2318 

2319 return MemoryRebaseStateManager(self) 

2320 

2321 @classmethod 

2322 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2323 """Create a new bare repository in memory. 

2324 

2325 Args: 

2326 objects: Objects for the new repository, 

2327 as iterable 

2328 refs: Refs as dictionary, mapping names 

2329 to object SHA1s 

2330 format: Repository format version (defaults to 0) 

2331 """ 

2332 ret = cls() 

2333 for obj in objects: 

2334 ret.object_store.add_object(obj) 

2335 for refname, sha in refs.items(): 

2336 ret.refs.add_if_new(refname, sha) 

2337 ret._init_files(bare=True, format=format) 

2338 return ret