Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1001 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence 

38from io import BytesIO 

39from types import TracebackType 

40from typing import ( 

41 TYPE_CHECKING, 

42 Any, 

43 BinaryIO, 

44 Optional, 

45 TypeVar, 

46) 

47 

48if TYPE_CHECKING: 

49 # There are no circular imports here, but we try to defer imports as long 

50 # as possible to reduce start-up time for anything that doesn't need 

51 # these imports. 

52 from .attrs import GitAttributes 

53 from .config import ConditionMatcher, ConfigFile, StackedConfig 

54 from .diff_tree import RenameDetector 

55 from .filters import FilterBlobNormalizer, FilterContext 

56 from .index import Index 

57 from .notes import Notes 

58 from .object_store import BaseObjectStore, GraphWalker 

59 from .pack import UnpackedObject 

60 from .rebase import RebaseStateManager 

61 from .walk import Walker 

62 from .worktree import WorkTree 

63 

64from . import reflog, replace_me 

65from .errors import ( 

66 NoIndexPresent, 

67 NotBlobError, 

68 NotCommitError, 

69 NotGitRepository, 

70 NotTagError, 

71 NotTreeError, 

72 RefFormatError, 

73) 

74from .file import GitFile 

75from .hooks import ( 

76 CommitMsgShellHook, 

77 Hook, 

78 PostCommitShellHook, 

79 PostReceiveShellHook, 

80 PreCommitShellHook, 

81) 

82from .object_store import ( 

83 DiskObjectStore, 

84 MemoryObjectStore, 

85 MissingObjectFinder, 

86 ObjectStoreGraphWalker, 

87 PackBasedObjectStore, 

88 PackCapableObjectStore, 

89 find_shallow, 

90 peel_sha, 

91) 

92from .objects import ( 

93 Blob, 

94 Commit, 

95 ObjectID, 

96 ShaFile, 

97 Tag, 

98 Tree, 

99 check_hexsha, 

100 valid_hexsha, 

101) 

102from .pack import generate_unpacked_objects 

103from .refs import ( 

104 ANNOTATED_TAG_SUFFIX, # noqa: F401 

105 LOCAL_TAG_PREFIX, # noqa: F401 

106 SYMREF, # noqa: F401 

107 DictRefsContainer, 

108 DiskRefsContainer, 

109 InfoRefsContainer, # noqa: F401 

110 Ref, 

111 RefsContainer, 

112 _set_default_branch, 

113 _set_head, 

114 _set_origin_head, 

115 check_ref_format, # noqa: F401 

116 extract_branch_name, 

117 is_per_worktree_ref, 

118 local_branch_name, 

119 read_packed_refs, # noqa: F401 

120 read_packed_refs_with_peeled, # noqa: F401 

121 serialize_refs, 

122 write_packed_refs, # noqa: F401 

123) 

124 

125CONTROLDIR = ".git" 

126OBJECTDIR = "objects" 

127DEFAULT_OFS_DELTA = True 

128 

129T = TypeVar("T", bound="ShaFile") 

130REFSDIR = "refs" 

131REFSDIR_TAGS = "tags" 

132REFSDIR_HEADS = "heads" 

133INDEX_FILENAME = "index" 

134COMMONDIR = "commondir" 

135GITDIR = "gitdir" 

136WORKTREES = "worktrees" 

137 

138BASE_DIRECTORIES = [ 

139 ["branches"], 

140 [REFSDIR], 

141 [REFSDIR, REFSDIR_TAGS], 

142 [REFSDIR, REFSDIR_HEADS], 

143 ["hooks"], 

144 ["info"], 

145] 

146 

147DEFAULT_BRANCH = b"master" 

148 

149 

150class InvalidUserIdentity(Exception): 

151 """User identity is not of the format 'user <email>'.""" 

152 

153 def __init__(self, identity: str) -> None: 

154 """Initialize InvalidUserIdentity exception.""" 

155 self.identity = identity 

156 

157 

158class DefaultIdentityNotFound(Exception): 

159 """Default identity could not be determined.""" 

160 

161 

162# TODO(jelmer): Cache? 

163def _get_default_identity() -> tuple[str, str]: 

164 import socket 

165 

166 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

167 username = os.environ.get(name) 

168 if username: 

169 break 

170 else: 

171 username = None 

172 

173 try: 

174 import pwd 

175 except ImportError: 

176 fullname = None 

177 else: 

178 try: 

179 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

180 except KeyError: 

181 fullname = None 

182 else: 

183 if getattr(entry, "gecos", None): 

184 fullname = entry.pw_gecos.split(",")[0] 

185 else: 

186 fullname = None 

187 if username is None: 

188 username = entry.pw_name 

189 if not fullname: 

190 if username is None: 

191 raise DefaultIdentityNotFound("no username found") 

192 fullname = username 

193 email = os.environ.get("EMAIL") 

194 if email is None: 

195 if username is None: 

196 raise DefaultIdentityNotFound("no username found") 

197 email = f"{username}@{socket.gethostname()}" 

198 return (fullname, email) 

199 

200 

201def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes: 

202 """Determine the identity to use for new commits. 

203 

204 If kind is set, this first checks 

205 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

206 

207 If those variables are not set, then it will fall back 

208 to reading the user.name and user.email settings from 

209 the specified configuration. 

210 

211 If that also fails, then it will fall back to using 

212 the current users' identity as obtained from the host 

213 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

214 

215 Args: 

216 config: Configuration stack to read from 

217 kind: Optional kind to return identity for, 

218 usually either "AUTHOR" or "COMMITTER". 

219 

220 Returns: 

221 A user identity 

222 """ 

223 user: bytes | None = None 

224 email: bytes | None = None 

225 if kind: 

226 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

227 if user_uc is not None: 

228 user = user_uc.encode("utf-8") 

229 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

230 if email_uc is not None: 

231 email = email_uc.encode("utf-8") 

232 if user is None: 

233 try: 

234 user = config.get(("user",), "name") 

235 except KeyError: 

236 user = None 

237 if email is None: 

238 try: 

239 email = config.get(("user",), "email") 

240 except KeyError: 

241 email = None 

242 default_user, default_email = _get_default_identity() 

243 if user is None: 

244 user = default_user.encode("utf-8") 

245 if email is None: 

246 email = default_email.encode("utf-8") 

247 if email.startswith(b"<") and email.endswith(b">"): 

248 email = email[1:-1] 

249 return user + b" <" + email + b">" 

250 

251 

252def check_user_identity(identity: bytes) -> None: 

253 """Verify that a user identity is formatted correctly. 

254 

255 Args: 

256 identity: User identity bytestring 

257 Raises: 

258 InvalidUserIdentity: Raised when identity is invalid 

259 """ 

260 try: 

261 _fst, snd = identity.split(b" <", 1) 

262 except ValueError as exc: 

263 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

264 if b">" not in snd: 

265 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

266 if b"\0" in identity or b"\n" in identity: 

267 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

268 

269 

270def parse_graftpoints( 

271 graftpoints: Iterable[bytes], 

272) -> dict[bytes, list[bytes]]: 

273 """Convert a list of graftpoints into a dict. 

274 

275 Args: 

276 graftpoints: Iterator of graftpoint lines 

277 

278 Each line is formatted as: 

279 <commit sha1> <parent sha1> [<parent sha1>]* 

280 

281 Resulting dictionary is: 

282 <commit sha1>: [<parent sha1>*] 

283 

284 https://git.wiki.kernel.org/index.php/GraftPoint 

285 """ 

286 grafts = {} 

287 for line in graftpoints: 

288 raw_graft = line.split(None, 1) 

289 

290 commit = raw_graft[0] 

291 if len(raw_graft) == 2: 

292 parents = raw_graft[1].split() 

293 else: 

294 parents = [] 

295 

296 for sha in [commit, *parents]: 

297 check_hexsha(sha, "Invalid graftpoint") 

298 

299 grafts[commit] = parents 

300 return grafts 

301 

302 

303def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes: 

304 """Convert a dictionary of grafts into string. 

305 

306 The graft dictionary is: 

307 <commit sha1>: [<parent sha1>*] 

308 

309 Each line is formatted as: 

310 <commit sha1> <parent sha1> [<parent sha1>]* 

311 

312 https://git.wiki.kernel.org/index.php/GraftPoint 

313 

314 """ 

315 graft_lines = [] 

316 for commit, parents in graftpoints.items(): 

317 if parents: 

318 graft_lines.append(commit + b" " + b" ".join(parents)) 

319 else: 

320 graft_lines.append(commit) 

321 return b"\n".join(graft_lines) 

322 

323 

324def _set_filesystem_hidden(path: str) -> None: 

325 """Mark path as to be hidden if supported by platform and filesystem. 

326 

327 On win32 uses SetFileAttributesW api: 

328 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

329 """ 

330 if sys.platform == "win32": 

331 import ctypes 

332 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

333 

334 FILE_ATTRIBUTE_HIDDEN = 2 

335 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

336 ("SetFileAttributesW", ctypes.windll.kernel32) 

337 ) 

338 

339 if isinstance(path, bytes): 

340 path = os.fsdecode(path) 

341 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

342 pass # Could raise or log `ctypes.WinError()` here 

343 

344 # Could implement other platform specific filesystem hiding here 

345 

346 

347class ParentsProvider: 

348 """Provider for commit parent information.""" 

349 

350 def __init__( 

351 self, 

352 store: "BaseObjectStore", 

353 grafts: dict[bytes, list[bytes]] = {}, 

354 shallows: Iterable[bytes] = [], 

355 ) -> None: 

356 """Initialize ParentsProvider. 

357 

358 Args: 

359 store: Object store to use 

360 grafts: Graft information 

361 shallows: Shallow commit SHAs 

362 """ 

363 self.store = store 

364 self.grafts = grafts 

365 self.shallows = set(shallows) 

366 

367 # Get commit graph once at initialization for performance 

368 self.commit_graph = store.get_commit_graph() 

369 

370 def get_parents( 

371 self, commit_id: bytes, commit: Commit | None = None 

372 ) -> list[bytes]: 

373 """Get parents for a commit using the parents provider.""" 

374 try: 

375 return self.grafts[commit_id] 

376 except KeyError: 

377 pass 

378 if commit_id in self.shallows: 

379 return [] 

380 

381 # Try to use commit graph for faster parent lookup 

382 if self.commit_graph: 

383 parents = self.commit_graph.get_parents(commit_id) 

384 if parents is not None: 

385 return parents 

386 

387 # Fallback to reading the commit object 

388 if commit is None: 

389 obj = self.store[commit_id] 

390 assert isinstance(obj, Commit) 

391 commit = obj 

392 parents = commit.parents 

393 assert isinstance(parents, list) 

394 return parents 

395 

396 

397class BaseRepo: 

398 """Base class for a git repository. 

399 

400 This base class is meant to be used for Repository implementations that e.g. 

401 work on top of a different transport than a standard filesystem path. 

402 

403 Attributes: 

404 object_store: Dictionary-like object for accessing 

405 the objects 

406 refs: Dictionary-like object with the refs in this 

407 repository 

408 """ 

409 

410 def __init__( 

411 self, object_store: "PackCapableObjectStore", refs: RefsContainer 

412 ) -> None: 

413 """Open a repository. 

414 

415 This shouldn't be called directly, but rather through one of the 

416 base classes, such as MemoryRepo or Repo. 

417 

418 Args: 

419 object_store: Object store to use 

420 refs: Refs container to use 

421 """ 

422 self.object_store = object_store 

423 self.refs = refs 

424 

425 self._graftpoints: dict[bytes, list[bytes]] = {} 

426 self.hooks: dict[str, Hook] = {} 

427 

428 def _determine_file_mode(self) -> bool: 

429 """Probe the file-system to determine whether permissions can be trusted. 

430 

431 Returns: True if permissions can be trusted, False otherwise. 

432 """ 

433 raise NotImplementedError(self._determine_file_mode) 

434 

435 def _determine_symlinks(self) -> bool: 

436 """Probe the filesystem to determine whether symlinks can be created. 

437 

438 Returns: True if symlinks can be created, False otherwise. 

439 """ 

440 # For now, just mimic the old behaviour 

441 return sys.platform != "win32" 

442 

443 def _init_files( 

444 self, bare: bool, symlinks: bool | None = None, format: int | None = None 

445 ) -> None: 

446 """Initialize a default set of named files.""" 

447 from .config import ConfigFile 

448 

449 self._put_named_file("description", b"Unnamed repository") 

450 f = BytesIO() 

451 cf = ConfigFile() 

452 if format is None: 

453 format = 0 

454 if format not in (0, 1): 

455 raise ValueError(f"Unsupported repository format version: {format}") 

456 cf.set("core", "repositoryformatversion", str(format)) 

457 if self._determine_file_mode(): 

458 cf.set("core", "filemode", True) 

459 else: 

460 cf.set("core", "filemode", False) 

461 

462 if symlinks is None and not bare: 

463 symlinks = self._determine_symlinks() 

464 

465 if symlinks is False: 

466 cf.set("core", "symlinks", symlinks) 

467 

468 cf.set("core", "bare", bare) 

469 cf.set("core", "logallrefupdates", True) 

470 cf.write_to_file(f) 

471 self._put_named_file("config", f.getvalue()) 

472 self._put_named_file(os.path.join("info", "exclude"), b"") 

473 

474 def get_named_file(self, path: str) -> BinaryIO | None: 

475 """Get a file from the control dir with a specific name. 

476 

477 Although the filename should be interpreted as a filename relative to 

478 the control dir in a disk-based Repo, the object returned need not be 

479 pointing to a file in that location. 

480 

481 Args: 

482 path: The path to the file, relative to the control dir. 

483 Returns: An open file object, or None if the file does not exist. 

484 """ 

485 raise NotImplementedError(self.get_named_file) 

486 

487 def _put_named_file(self, path: str, contents: bytes) -> None: 

488 """Write a file to the control dir with the given name and contents. 

489 

490 Args: 

491 path: The path to the file, relative to the control dir. 

492 contents: A string to write to the file. 

493 """ 

494 raise NotImplementedError(self._put_named_file) 

495 

496 def _del_named_file(self, path: str) -> None: 

497 """Delete a file in the control directory with the given name.""" 

498 raise NotImplementedError(self._del_named_file) 

499 

500 def open_index(self) -> "Index": 

501 """Open the index for this repository. 

502 

503 Raises: 

504 NoIndexPresent: If no index is present 

505 Returns: The matching `Index` 

506 """ 

507 raise NotImplementedError(self.open_index) 

508 

509 def fetch( 

510 self, 

511 target: "BaseRepo", 

512 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]] 

513 | None = None, 

514 progress: Callable[..., None] | None = None, 

515 depth: int | None = None, 

516 ) -> dict[bytes, bytes]: 

517 """Fetch objects into another repository. 

518 

519 Args: 

520 target: The target repository 

521 determine_wants: Optional function to determine what refs to 

522 fetch. 

523 progress: Optional progress function 

524 depth: Optional shallow fetch depth 

525 Returns: The local refs 

526 """ 

527 if determine_wants is None: 

528 determine_wants = target.object_store.determine_wants_all 

529 count, pack_data = self.fetch_pack_data( 

530 determine_wants, 

531 target.get_graph_walker(), 

532 progress=progress, 

533 depth=depth, 

534 ) 

535 target.object_store.add_pack_data(count, pack_data, progress) 

536 return self.get_refs() 

537 

538 def fetch_pack_data( 

539 self, 

540 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]], 

541 graph_walker: "GraphWalker", 

542 progress: Callable[[bytes], None] | None, 

543 *, 

544 get_tagged: Callable[[], dict[bytes, bytes]] | None = None, 

545 depth: int | None = None, 

546 ) -> tuple[int, Iterator["UnpackedObject"]]: 

547 """Fetch the pack data required for a set of revisions. 

548 

549 Args: 

550 determine_wants: Function that takes a dictionary with heads 

551 and returns the list of heads to fetch. 

552 graph_walker: Object that can iterate over the list of revisions 

553 to fetch and has an "ack" method that will be called to acknowledge 

554 that a revision is present. 

555 progress: Simple progress function that will be called with 

556 updated progress strings. 

557 get_tagged: Function that returns a dict of pointed-to sha -> 

558 tag sha for including tags. 

559 depth: Shallow fetch depth 

560 Returns: count and iterator over pack data 

561 """ 

562 missing_objects = self.find_missing_objects( 

563 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

564 ) 

565 if missing_objects is None: 

566 return 0, iter([]) 

567 remote_has = missing_objects.get_remote_has() 

568 object_ids = list(missing_objects) 

569 return len(object_ids), generate_unpacked_objects( 

570 self.object_store, object_ids, progress=progress, other_haves=remote_has 

571 ) 

572 

573 def find_missing_objects( 

574 self, 

575 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]], 

576 graph_walker: "GraphWalker", 

577 progress: Callable[[bytes], None] | None, 

578 *, 

579 get_tagged: Callable[[], dict[bytes, bytes]] | None = None, 

580 depth: int | None = None, 

581 ) -> MissingObjectFinder | None: 

582 """Fetch the missing objects required for a set of revisions. 

583 

584 Args: 

585 determine_wants: Function that takes a dictionary with heads 

586 and returns the list of heads to fetch. 

587 graph_walker: Object that can iterate over the list of revisions 

588 to fetch and has an "ack" method that will be called to acknowledge 

589 that a revision is present. 

590 progress: Simple progress function that will be called with 

591 updated progress strings. 

592 get_tagged: Function that returns a dict of pointed-to sha -> 

593 tag sha for including tags. 

594 depth: Shallow fetch depth 

595 Returns: iterator over objects, with __len__ implemented 

596 """ 

597 refs = serialize_refs(self.object_store, self.get_refs()) 

598 

599 wants = determine_wants(refs, depth) 

600 if not isinstance(wants, list): 

601 raise TypeError("determine_wants() did not return a list") 

602 

603 current_shallow = set(getattr(graph_walker, "shallow", set())) 

604 

605 if depth not in (None, 0): 

606 assert depth is not None 

607 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

608 # Only update if graph_walker has shallow attribute 

609 if hasattr(graph_walker, "shallow"): 

610 graph_walker.shallow.update(shallow - not_shallow) 

611 new_shallow = graph_walker.shallow - current_shallow 

612 unshallow = not_shallow & current_shallow 

613 setattr(graph_walker, "unshallow", unshallow) 

614 if hasattr(graph_walker, "update_shallow"): 

615 graph_walker.update_shallow(new_shallow, unshallow) 

616 else: 

617 unshallow = getattr(graph_walker, "unshallow", set()) 

618 

619 if wants == []: 

620 # TODO(dborowitz): find a way to short-circuit that doesn't change 

621 # this interface. 

622 

623 if getattr(graph_walker, "shallow", set()) or unshallow: 

624 # Do not send a pack in shallow short-circuit path 

625 return None 

626 

627 # Return an actual MissingObjectFinder with empty wants 

628 return MissingObjectFinder( 

629 self.object_store, 

630 haves=[], 

631 wants=[], 

632 ) 

633 

634 # If the graph walker is set up with an implementation that can 

635 # ACK/NAK to the wire, it will write data to the client through 

636 # this call as a side-effect. 

637 haves = self.object_store.find_common_revisions(graph_walker) 

638 

639 # Deal with shallow requests separately because the haves do 

640 # not reflect what objects are missing 

641 if getattr(graph_walker, "shallow", set()) or unshallow: 

642 # TODO: filter the haves commits from iter_shas. the specific 

643 # commits aren't missing. 

644 haves = [] 

645 

646 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

647 

648 def get_parents(commit: Commit) -> list[bytes]: 

649 """Get parents for a commit using the parents provider. 

650 

651 Args: 

652 commit: Commit object 

653 

654 Returns: 

655 List of parent commit SHAs 

656 """ 

657 return parents_provider.get_parents(commit.id, commit) 

658 

659 return MissingObjectFinder( 

660 self.object_store, 

661 haves=haves, 

662 wants=wants, 

663 shallow=getattr(graph_walker, "shallow", set()), 

664 progress=progress, 

665 get_tagged=get_tagged, 

666 get_parents=get_parents, 

667 ) 

668 

669 def generate_pack_data( 

670 self, 

671 have: set[ObjectID], 

672 want: set[ObjectID], 

673 *, 

674 shallow: set[ObjectID] | None = None, 

675 progress: Callable[[str], None] | None = None, 

676 ofs_delta: bool | None = None, 

677 ) -> tuple[int, Iterator["UnpackedObject"]]: 

678 """Generate pack data objects for a set of wants/haves. 

679 

680 Args: 

681 have: List of SHA1s of objects that should not be sent 

682 want: List of SHA1s of objects that should be sent 

683 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits) 

684 ofs_delta: Whether OFS deltas can be included 

685 progress: Optional progress reporting method 

686 """ 

687 if shallow is None: 

688 shallow = self.get_shallow() 

689 return self.object_store.generate_pack_data( 

690 have, 

691 want, 

692 shallow=shallow, 

693 progress=progress, 

694 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

695 ) 

696 

697 def get_graph_walker( 

698 self, heads: list[ObjectID] | None = None 

699 ) -> ObjectStoreGraphWalker: 

700 """Retrieve a graph walker. 

701 

702 A graph walker is used by a remote repository (or proxy) 

703 to find out which objects are present in this repository. 

704 

705 Args: 

706 heads: Repository heads to use (optional) 

707 Returns: A graph walker object 

708 """ 

709 if heads is None: 

710 heads = [ 

711 sha 

712 for sha in self.refs.as_dict(b"refs/heads").values() 

713 if sha in self.object_store 

714 ] 

715 parents_provider = ParentsProvider(self.object_store) 

716 return ObjectStoreGraphWalker( 

717 heads, 

718 parents_provider.get_parents, 

719 shallow=self.get_shallow(), 

720 update_shallow=self.update_shallow, 

721 ) 

722 

723 def get_refs(self) -> dict[bytes, bytes]: 

724 """Get dictionary with all refs. 

725 

726 Returns: A ``dict`` mapping ref names to SHA1s 

727 """ 

728 return self.refs.as_dict() 

729 

730 def head(self) -> bytes: 

731 """Return the SHA1 pointed at by HEAD.""" 

732 # TODO: move this method to WorkTree 

733 return self.refs[b"HEAD"] 

734 

735 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

736 assert len(sha) in (20, 40) 

737 ret = self.get_object(sha) 

738 if not isinstance(ret, cls): 

739 if cls is Commit: 

740 raise NotCommitError(ret.id) 

741 elif cls is Blob: 

742 raise NotBlobError(ret.id) 

743 elif cls is Tree: 

744 raise NotTreeError(ret.id) 

745 elif cls is Tag: 

746 raise NotTagError(ret.id) 

747 else: 

748 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

749 return ret 

750 

751 def get_object(self, sha: bytes) -> ShaFile: 

752 """Retrieve the object with the specified SHA. 

753 

754 Args: 

755 sha: SHA to retrieve 

756 Returns: A ShaFile object 

757 Raises: 

758 KeyError: when the object can not be found 

759 """ 

760 return self.object_store[sha] 

761 

762 def parents_provider(self) -> ParentsProvider: 

763 """Get a parents provider for this repository. 

764 

765 Returns: 

766 ParentsProvider instance configured with grafts and shallows 

767 """ 

768 return ParentsProvider( 

769 self.object_store, 

770 grafts=self._graftpoints, 

771 shallows=self.get_shallow(), 

772 ) 

773 

774 def get_parents(self, sha: bytes, commit: Commit | None = None) -> list[bytes]: 

775 """Retrieve the parents of a specific commit. 

776 

777 If the specific commit is a graftpoint, the graft parents 

778 will be returned instead. 

779 

780 Args: 

781 sha: SHA of the commit for which to retrieve the parents 

782 commit: Optional commit matching the sha 

783 Returns: List of parents 

784 """ 

785 return self.parents_provider().get_parents(sha, commit) 

786 

787 def get_config(self) -> "ConfigFile": 

788 """Retrieve the config object. 

789 

790 Returns: `ConfigFile` object for the ``.git/config`` file. 

791 """ 

792 raise NotImplementedError(self.get_config) 

793 

794 def get_worktree_config(self) -> "ConfigFile": 

795 """Retrieve the worktree config object.""" 

796 raise NotImplementedError(self.get_worktree_config) 

797 

798 def get_description(self) -> bytes | None: 

799 """Retrieve the description for this repository. 

800 

801 Returns: Bytes with the description of the repository 

802 as set by the user. 

803 """ 

804 raise NotImplementedError(self.get_description) 

805 

806 def set_description(self, description: bytes) -> None: 

807 """Set the description for this repository. 

808 

809 Args: 

810 description: Text to set as description for this repository. 

811 """ 

812 raise NotImplementedError(self.set_description) 

813 

814 def get_rebase_state_manager(self) -> "RebaseStateManager": 

815 """Get the appropriate rebase state manager for this repository. 

816 

817 Returns: RebaseStateManager instance 

818 """ 

819 raise NotImplementedError(self.get_rebase_state_manager) 

820 

821 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

822 """Return a BlobNormalizer object for checkin/checkout operations. 

823 

824 Returns: BlobNormalizer instance 

825 """ 

826 raise NotImplementedError(self.get_blob_normalizer) 

827 

828 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

829 """Read gitattributes for the repository. 

830 

831 Args: 

832 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

833 

834 Returns: 

835 GitAttributes object that can be used to match paths 

836 """ 

837 raise NotImplementedError(self.get_gitattributes) 

838 

839 def get_config_stack(self) -> "StackedConfig": 

840 """Return a config stack for this repository. 

841 

842 This stack accesses the configuration for both this repository 

843 itself (.git/config) and the global configuration, which usually 

844 lives in ~/.gitconfig. 

845 

846 Returns: `Config` instance for this repository 

847 """ 

848 from .config import ConfigFile, StackedConfig 

849 

850 local_config = self.get_config() 

851 backends: list[ConfigFile] = [local_config] 

852 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

853 backends.append(self.get_worktree_config()) 

854 

855 backends += StackedConfig.default_backends() 

856 return StackedConfig(backends, writable=local_config) 

857 

858 def get_shallow(self) -> set[ObjectID]: 

859 """Get the set of shallow commits. 

860 

861 Returns: Set of shallow commits. 

862 """ 

863 f = self.get_named_file("shallow") 

864 if f is None: 

865 return set() 

866 with f: 

867 return {line.strip() for line in f} 

868 

869 def update_shallow( 

870 self, new_shallow: set[bytes] | None, new_unshallow: set[bytes] | None 

871 ) -> None: 

872 """Update the list of shallow objects. 

873 

874 Args: 

875 new_shallow: Newly shallow objects 

876 new_unshallow: Newly no longer shallow objects 

877 """ 

878 shallow = self.get_shallow() 

879 if new_shallow: 

880 shallow.update(new_shallow) 

881 if new_unshallow: 

882 shallow.difference_update(new_unshallow) 

883 if shallow: 

884 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

885 else: 

886 self._del_named_file("shallow") 

887 

888 def get_peeled(self, ref: Ref) -> ObjectID: 

889 """Get the peeled value of a ref. 

890 

891 Args: 

892 ref: The refname to peel. 

893 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

894 intermediate tags; if the original ref does not point to a tag, 

895 this will equal the original SHA1. 

896 """ 

897 cached = self.refs.get_peeled(ref) 

898 if cached is not None: 

899 return cached 

900 return peel_sha(self.object_store, self.refs[ref])[1].id 

901 

902 @property 

903 def notes(self) -> "Notes": 

904 """Access notes functionality for this repository. 

905 

906 Returns: 

907 Notes object for accessing notes 

908 """ 

909 from .notes import Notes 

910 

911 return Notes(self.object_store, self.refs) 

912 

913 def get_walker( 

914 self, 

915 include: Sequence[bytes] | None = None, 

916 exclude: Sequence[bytes] | None = None, 

917 order: str = "date", 

918 reverse: bool = False, 

919 max_entries: int | None = None, 

920 paths: Sequence[bytes] | None = None, 

921 rename_detector: Optional["RenameDetector"] = None, 

922 follow: bool = False, 

923 since: int | None = None, 

924 until: int | None = None, 

925 queue_cls: type | None = None, 

926 ) -> "Walker": 

927 """Obtain a walker for this repository. 

928 

929 Args: 

930 include: Iterable of SHAs of commits to include along with their 

931 ancestors. Defaults to [HEAD] 

932 exclude: Iterable of SHAs of commits to exclude along with their 

933 ancestors, overriding includes. 

934 order: ORDER_* constant specifying the order of results. 

935 Anything other than ORDER_DATE may result in O(n) memory usage. 

936 reverse: If True, reverse the order of output, requiring O(n) 

937 memory. 

938 max_entries: The maximum number of entries to yield, or None for 

939 no limit. 

940 paths: Iterable of file or subtree paths to show entries for. 

941 rename_detector: diff.RenameDetector object for detecting 

942 renames. 

943 follow: If True, follow path across renames/copies. Forces a 

944 default rename_detector. 

945 since: Timestamp to list commits after. 

946 until: Timestamp to list commits before. 

947 queue_cls: A class to use for a queue of commits, supporting the 

948 iterator protocol. The constructor takes a single argument, the Walker. 

949 

950 Returns: A `Walker` object 

951 """ 

952 from .walk import Walker, _CommitTimeQueue 

953 

954 if include is None: 

955 include = [self.head()] 

956 

957 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

958 return Walker( 

959 self.object_store, 

960 include, 

961 exclude=exclude, 

962 order=order, 

963 reverse=reverse, 

964 max_entries=max_entries, 

965 paths=paths, 

966 rename_detector=rename_detector, 

967 follow=follow, 

968 since=since, 

969 until=until, 

970 get_parents=lambda commit: self.get_parents(commit.id, commit), 

971 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

972 ) 

973 

974 def __getitem__(self, name: ObjectID | Ref) -> "ShaFile": 

975 """Retrieve a Git object by SHA1 or ref. 

976 

977 Args: 

978 name: A Git object SHA1 or a ref name 

979 Returns: A `ShaFile` object, such as a Commit or Blob 

980 Raises: 

981 KeyError: when the specified ref or object does not exist 

982 """ 

983 if not isinstance(name, bytes): 

984 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

985 if len(name) in (20, 40): 

986 try: 

987 return self.object_store[name] 

988 except (KeyError, ValueError): 

989 pass 

990 try: 

991 return self.object_store[self.refs[name]] 

992 except RefFormatError as exc: 

993 raise KeyError(name) from exc 

994 

995 def __contains__(self, name: bytes) -> bool: 

996 """Check if a specific Git object or ref is present. 

997 

998 Args: 

999 name: Git object SHA1 or ref name 

1000 """ 

1001 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

1002 return name in self.object_store or name in self.refs 

1003 else: 

1004 return name in self.refs 

1005 

1006 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None: 

1007 """Set a ref. 

1008 

1009 Args: 

1010 name: ref name 

1011 value: Ref value - either a ShaFile object, or a hex sha 

1012 """ 

1013 if name.startswith(b"refs/") or name == b"HEAD": 

1014 if isinstance(value, ShaFile): 

1015 self.refs[name] = value.id 

1016 elif isinstance(value, bytes): 

1017 self.refs[name] = value 

1018 else: 

1019 raise TypeError(value) 

1020 else: 

1021 raise ValueError(name) 

1022 

1023 def __delitem__(self, name: bytes) -> None: 

1024 """Remove a ref. 

1025 

1026 Args: 

1027 name: Name of the ref to remove 

1028 """ 

1029 if name.startswith(b"refs/") or name == b"HEAD": 

1030 del self.refs[name] 

1031 else: 

1032 raise ValueError(name) 

1033 

1034 def _get_user_identity( 

1035 self, config: "StackedConfig", kind: str | None = None 

1036 ) -> bytes: 

1037 """Determine the identity to use for new commits.""" 

1038 warnings.warn( 

1039 "use get_user_identity() rather than Repo._get_user_identity", 

1040 DeprecationWarning, 

1041 ) 

1042 return get_user_identity(config) 

1043 

1044 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1045 """Add or modify graftpoints. 

1046 

1047 Args: 

1048 updated_graftpoints: Dict of commit shas to list of parent shas 

1049 """ 

1050 # Simple validation 

1051 for commit, parents in updated_graftpoints.items(): 

1052 for sha in [commit, *parents]: 

1053 check_hexsha(sha, "Invalid graftpoint") 

1054 

1055 self._graftpoints.update(updated_graftpoints) 

1056 

1057 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None: 

1058 """Remove graftpoints. 

1059 

1060 Args: 

1061 to_remove: List of commit shas 

1062 """ 

1063 for sha in to_remove: 

1064 del self._graftpoints[sha] 

1065 

1066 def _read_heads(self, name: str) -> list[bytes]: 

1067 f = self.get_named_file(name) 

1068 if f is None: 

1069 return [] 

1070 with f: 

1071 return [line.strip() for line in f.readlines() if line.strip()] 

1072 

1073 def get_worktree(self) -> "WorkTree": 

1074 """Get the working tree for this repository. 

1075 

1076 Returns: 

1077 WorkTree instance for performing working tree operations 

1078 

1079 Raises: 

1080 NotImplementedError: If the repository doesn't support working trees 

1081 """ 

1082 raise NotImplementedError( 

1083 "Working tree operations not supported by this repository type" 

1084 ) 

1085 

1086 @replace_me(remove_in="0.26.0") 

1087 def do_commit( 

1088 self, 

1089 message: bytes | None = None, 

1090 committer: bytes | None = None, 

1091 author: bytes | None = None, 

1092 commit_timestamp: float | None = None, 

1093 commit_timezone: int | None = None, 

1094 author_timestamp: float | None = None, 

1095 author_timezone: int | None = None, 

1096 tree: ObjectID | None = None, 

1097 encoding: bytes | None = None, 

1098 ref: Ref | None = b"HEAD", 

1099 merge_heads: list[ObjectID] | None = None, 

1100 no_verify: bool = False, 

1101 sign: bool = False, 

1102 ) -> bytes: 

1103 """Create a new commit. 

1104 

1105 If not specified, committer and author default to 

1106 get_user_identity(..., 'COMMITTER') 

1107 and get_user_identity(..., 'AUTHOR') respectively. 

1108 

1109 Args: 

1110 message: Commit message (bytes or callable that takes (repo, commit) 

1111 and returns bytes) 

1112 committer: Committer fullname 

1113 author: Author fullname 

1114 commit_timestamp: Commit timestamp (defaults to now) 

1115 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1116 author_timestamp: Author timestamp (defaults to commit 

1117 timestamp) 

1118 author_timezone: Author timestamp timezone 

1119 (defaults to commit timestamp timezone) 

1120 tree: SHA1 of the tree root to use (if not specified the 

1121 current index will be committed). 

1122 encoding: Encoding 

1123 ref: Optional ref to commit to (defaults to current branch). 

1124 If None, creates a dangling commit without updating any ref. 

1125 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1126 no_verify: Skip pre-commit and commit-msg hooks 

1127 sign: GPG Sign the commit (bool, defaults to False, 

1128 pass True to use default GPG key, 

1129 pass a str containing Key ID to use a specific GPG key) 

1130 

1131 Returns: 

1132 New commit SHA1 

1133 """ 

1134 return self.get_worktree().commit( 

1135 message=message, 

1136 committer=committer, 

1137 author=author, 

1138 commit_timestamp=commit_timestamp, 

1139 commit_timezone=commit_timezone, 

1140 author_timestamp=author_timestamp, 

1141 author_timezone=author_timezone, 

1142 tree=tree, 

1143 encoding=encoding, 

1144 ref=ref, 

1145 merge_heads=merge_heads, 

1146 no_verify=no_verify, 

1147 sign=sign, 

1148 ) 

1149 

1150 

1151def read_gitfile(f: BinaryIO) -> str: 

1152 """Read a ``.git`` file. 

1153 

1154 The first line of the file should start with "gitdir: " 

1155 

1156 Args: 

1157 f: File-like object to read from 

1158 Returns: A path 

1159 """ 

1160 cs = f.read() 

1161 if not cs.startswith(b"gitdir: "): 

1162 raise ValueError("Expected file to start with 'gitdir: '") 

1163 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8") 

1164 

1165 

1166class UnsupportedVersion(Exception): 

1167 """Unsupported repository version.""" 

1168 

1169 def __init__(self, version: int) -> None: 

1170 """Initialize UnsupportedVersion exception. 

1171 

1172 Args: 

1173 version: The unsupported repository version 

1174 """ 

1175 self.version = version 

1176 

1177 

1178class UnsupportedExtension(Exception): 

1179 """Unsupported repository extension.""" 

1180 

1181 def __init__(self, extension: str) -> None: 

1182 """Initialize UnsupportedExtension exception. 

1183 

1184 Args: 

1185 extension: The unsupported repository extension 

1186 """ 

1187 self.extension = extension 

1188 

1189 

1190class Repo(BaseRepo): 

1191 """A git repository backed by local disk. 

1192 

1193 To open an existing repository, call the constructor with 

1194 the path of the repository. 

1195 

1196 To create a new repository, use the Repo.init class method. 

1197 

1198 Note that a repository object may hold on to resources such 

1199 as file handles for performance reasons; call .close() to free 

1200 up those resources. 

1201 

1202 Attributes: 

1203 path: Path to the working copy (if it exists) or repository control 

1204 directory (if the repository is bare) 

1205 bare: Whether this is a bare repository 

1206 """ 

1207 

1208 path: str 

1209 bare: bool 

1210 object_store: DiskObjectStore 

1211 filter_context: Optional["FilterContext"] 

1212 

1213 def __init__( 

1214 self, 

1215 root: str | bytes | os.PathLike[str], 

1216 object_store: PackBasedObjectStore | None = None, 

1217 bare: bool | None = None, 

1218 ) -> None: 

1219 """Open a repository on disk. 

1220 

1221 Args: 

1222 root: Path to the repository's root. 

1223 object_store: ObjectStore to use; if omitted, we use the 

1224 repository's default object store 

1225 bare: True if this is a bare repository. 

1226 """ 

1227 root = os.fspath(root) 

1228 if isinstance(root, bytes): 

1229 root = os.fsdecode(root) 

1230 hidden_path = os.path.join(root, CONTROLDIR) 

1231 if bare is None: 

1232 if os.path.isfile(hidden_path) or os.path.isdir( 

1233 os.path.join(hidden_path, OBJECTDIR) 

1234 ): 

1235 bare = False 

1236 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1237 os.path.join(root, REFSDIR) 

1238 ): 

1239 bare = True 

1240 else: 

1241 raise NotGitRepository( 

1242 "No git repository was found at {path}".format(**dict(path=root)) 

1243 ) 

1244 

1245 self.bare = bare 

1246 if bare is False: 

1247 if os.path.isfile(hidden_path): 

1248 with open(hidden_path, "rb") as f: 

1249 path = read_gitfile(f) 

1250 self._controldir = os.path.join(root, path) 

1251 else: 

1252 self._controldir = hidden_path 

1253 else: 

1254 self._controldir = root 

1255 commondir = self.get_named_file(COMMONDIR) 

1256 if commondir is not None: 

1257 with commondir: 

1258 self._commondir = os.path.join( 

1259 self.controldir(), 

1260 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1261 ) 

1262 else: 

1263 self._commondir = self._controldir 

1264 self.path = root 

1265 

1266 # Initialize refs early so they're available for config condition matchers 

1267 self.refs = DiskRefsContainer( 

1268 self.commondir(), self._controldir, logger=self._write_reflog 

1269 ) 

1270 

1271 # Initialize worktrees container 

1272 from .worktree import WorkTreeContainer 

1273 

1274 self.worktrees = WorkTreeContainer(self) 

1275 

1276 config = self.get_config() 

1277 try: 

1278 repository_format_version = config.get("core", "repositoryformatversion") 

1279 format_version = ( 

1280 0 

1281 if repository_format_version is None 

1282 else int(repository_format_version) 

1283 ) 

1284 except KeyError: 

1285 format_version = 0 

1286 

1287 if format_version not in (0, 1): 

1288 raise UnsupportedVersion(format_version) 

1289 

1290 # Track extensions we encounter 

1291 has_reftable_extension = False 

1292 for extension, value in config.items((b"extensions",)): 

1293 if extension.lower() == b"refstorage": 

1294 if value == b"reftable": 

1295 has_reftable_extension = True 

1296 else: 

1297 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1298 elif extension.lower() not in (b"worktreeconfig",): 

1299 raise UnsupportedExtension(extension.decode("utf-8")) 

1300 

1301 if object_store is None: 

1302 object_store = DiskObjectStore.from_config( 

1303 os.path.join(self.commondir(), OBJECTDIR), config 

1304 ) 

1305 

1306 # Use reftable if extension is configured 

1307 if has_reftable_extension: 

1308 from .reftable import ReftableRefsContainer 

1309 

1310 self.refs = ReftableRefsContainer(self.commondir()) 

1311 # Update worktrees container after refs change 

1312 self.worktrees = WorkTreeContainer(self) 

1313 BaseRepo.__init__(self, object_store, self.refs) 

1314 

1315 self._graftpoints = {} 

1316 graft_file = self.get_named_file( 

1317 os.path.join("info", "grafts"), basedir=self.commondir() 

1318 ) 

1319 if graft_file: 

1320 with graft_file: 

1321 self._graftpoints.update(parse_graftpoints(graft_file)) 

1322 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1323 if graft_file: 

1324 with graft_file: 

1325 self._graftpoints.update(parse_graftpoints(graft_file)) 

1326 

1327 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1328 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1329 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1330 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1331 

1332 # Initialize filter context as None, will be created lazily 

1333 self.filter_context = None 

1334 

1335 def get_worktree(self) -> "WorkTree": 

1336 """Get the working tree for this repository. 

1337 

1338 Returns: 

1339 WorkTree instance for performing working tree operations 

1340 """ 

1341 from .worktree import WorkTree 

1342 

1343 return WorkTree(self, self.path) 

1344 

1345 def _write_reflog( 

1346 self, 

1347 ref: bytes, 

1348 old_sha: bytes, 

1349 new_sha: bytes, 

1350 committer: bytes | None, 

1351 timestamp: int | None, 

1352 timezone: int | None, 

1353 message: bytes, 

1354 ) -> None: 

1355 from .reflog import format_reflog_line 

1356 

1357 path = self._reflog_path(ref) 

1358 try: 

1359 os.makedirs(os.path.dirname(path)) 

1360 except FileExistsError: 

1361 pass 

1362 if committer is None: 

1363 config = self.get_config_stack() 

1364 committer = get_user_identity(config) 

1365 check_user_identity(committer) 

1366 if timestamp is None: 

1367 timestamp = int(time.time()) 

1368 if timezone is None: 

1369 timezone = 0 # FIXME 

1370 with open(path, "ab") as f: 

1371 f.write( 

1372 format_reflog_line( 

1373 old_sha, new_sha, committer, timestamp, timezone, message 

1374 ) 

1375 + b"\n" 

1376 ) 

1377 

1378 def _reflog_path(self, ref: bytes) -> str: 

1379 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1380 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1381 

1382 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1383 return os.path.join(base, "logs", os.fsdecode(ref)) 

1384 

1385 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1386 """Read reflog entries for a reference. 

1387 

1388 Args: 

1389 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1390 

1391 Yields: 

1392 reflog.Entry objects in chronological order (oldest first) 

1393 """ 

1394 from .reflog import read_reflog 

1395 

1396 path = self._reflog_path(ref) 

1397 try: 

1398 with open(path, "rb") as f: 

1399 yield from read_reflog(f) 

1400 except FileNotFoundError: 

1401 return 

1402 

1403 @classmethod 

1404 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo": 

1405 """Iterate parent directories to discover a repository. 

1406 

1407 Return a Repo object for the first parent directory that looks like a 

1408 Git repository. 

1409 

1410 Args: 

1411 start: The directory to start discovery from (defaults to '.') 

1412 """ 

1413 path = os.path.abspath(start) 

1414 while True: 

1415 try: 

1416 return cls(path) 

1417 except NotGitRepository: 

1418 new_path, _tail = os.path.split(path) 

1419 if new_path == path: # Root reached 

1420 break 

1421 path = new_path 

1422 start_str = os.fspath(start) 

1423 if isinstance(start_str, bytes): 

1424 start_str = start_str.decode("utf-8") 

1425 raise NotGitRepository(f"No git repository was found at {start_str}") 

1426 

1427 def controldir(self) -> str: 

1428 """Return the path of the control directory.""" 

1429 return self._controldir 

1430 

1431 def commondir(self) -> str: 

1432 """Return the path of the common directory. 

1433 

1434 For a main working tree, it is identical to controldir(). 

1435 

1436 For a linked working tree, it is the control directory of the 

1437 main working tree. 

1438 """ 

1439 return self._commondir 

1440 

1441 def _determine_file_mode(self) -> bool: 

1442 """Probe the file-system to determine whether permissions can be trusted. 

1443 

1444 Returns: True if permissions can be trusted, False otherwise. 

1445 """ 

1446 fname = os.path.join(self.path, ".probe-permissions") 

1447 with open(fname, "w") as f: 

1448 f.write("") 

1449 

1450 st1 = os.lstat(fname) 

1451 try: 

1452 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1453 except PermissionError: 

1454 return False 

1455 st2 = os.lstat(fname) 

1456 

1457 os.unlink(fname) 

1458 

1459 mode_differs = st1.st_mode != st2.st_mode 

1460 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1461 

1462 return mode_differs and st2_has_exec 

1463 

1464 def _determine_symlinks(self) -> bool: 

1465 """Probe the filesystem to determine whether symlinks can be created. 

1466 

1467 Returns: True if symlinks can be created, False otherwise. 

1468 """ 

1469 # TODO(jelmer): Actually probe disk / look at filesystem 

1470 return sys.platform != "win32" 

1471 

1472 def _put_named_file(self, path: str, contents: bytes) -> None: 

1473 """Write a file to the control dir with the given name and contents. 

1474 

1475 Args: 

1476 path: The path to the file, relative to the control dir. 

1477 contents: A string to write to the file. 

1478 """ 

1479 path = path.lstrip(os.path.sep) 

1480 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1481 f.write(contents) 

1482 

1483 def _del_named_file(self, path: str) -> None: 

1484 try: 

1485 os.unlink(os.path.join(self.controldir(), path)) 

1486 except FileNotFoundError: 

1487 return 

1488 

1489 def get_named_file( 

1490 self, 

1491 path: str | bytes, 

1492 basedir: str | None = None, 

1493 ) -> BinaryIO | None: 

1494 """Get a file from the control dir with a specific name. 

1495 

1496 Although the filename should be interpreted as a filename relative to 

1497 the control dir in a disk-based Repo, the object returned need not be 

1498 pointing to a file in that location. 

1499 

1500 Args: 

1501 path: The path to the file, relative to the control dir. 

1502 basedir: Optional argument that specifies an alternative to the 

1503 control dir. 

1504 Returns: An open file object, or None if the file does not exist. 

1505 """ 

1506 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1507 # the dumb web serving code. 

1508 if basedir is None: 

1509 basedir = self.controldir() 

1510 if isinstance(path, bytes): 

1511 path = path.decode("utf-8") 

1512 path = path.lstrip(os.path.sep) 

1513 try: 

1514 return open(os.path.join(basedir, path), "rb") 

1515 except FileNotFoundError: 

1516 return None 

1517 

1518 def index_path(self) -> str: 

1519 """Return path to the index file.""" 

1520 return os.path.join(self.controldir(), INDEX_FILENAME) 

1521 

1522 def open_index(self) -> "Index": 

1523 """Open the index for this repository. 

1524 

1525 Raises: 

1526 NoIndexPresent: If no index is present 

1527 Returns: The matching `Index` 

1528 """ 

1529 from .index import Index 

1530 

1531 if not self.has_index(): 

1532 raise NoIndexPresent 

1533 

1534 # Check for manyFiles feature configuration 

1535 config = self.get_config_stack() 

1536 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1537 skip_hash = False 

1538 index_version = None 

1539 

1540 if many_files: 

1541 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1542 try: 

1543 index_version_str = config.get(b"index", b"version") 

1544 index_version = int(index_version_str) 

1545 except KeyError: 

1546 index_version = 4 # Default to version 4 for manyFiles 

1547 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1548 else: 

1549 # Check for explicit index settings 

1550 try: 

1551 index_version_str = config.get(b"index", b"version") 

1552 index_version = int(index_version_str) 

1553 except KeyError: 

1554 index_version = None 

1555 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1556 

1557 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1558 

1559 def has_index(self) -> bool: 

1560 """Check if an index is present.""" 

1561 # Bare repos must never have index files; non-bare repos may have a 

1562 # missing index file, which is treated as empty. 

1563 return not self.bare 

1564 

1565 @replace_me(remove_in="0.26.0") 

1566 def stage( 

1567 self, 

1568 fs_paths: str 

1569 | bytes 

1570 | os.PathLike[str] 

1571 | Iterable[str | bytes | os.PathLike[str]], 

1572 ) -> None: 

1573 """Stage a set of paths. 

1574 

1575 Args: 

1576 fs_paths: List of paths, relative to the repository path 

1577 """ 

1578 return self.get_worktree().stage(fs_paths) 

1579 

1580 @replace_me(remove_in="0.26.0") 

1581 def unstage(self, fs_paths: Sequence[str]) -> None: 

1582 """Unstage specific file in the index. 

1583 

1584 Args: 

1585 fs_paths: a list of files to unstage, 

1586 relative to the repository path. 

1587 """ 

1588 return self.get_worktree().unstage(fs_paths) 

1589 

1590 def clone( 

1591 self, 

1592 target_path: str | bytes | os.PathLike[str], 

1593 *, 

1594 mkdir: bool = True, 

1595 bare: bool = False, 

1596 origin: bytes = b"origin", 

1597 checkout: bool | None = None, 

1598 branch: bytes | None = None, 

1599 progress: Callable[[str], None] | None = None, 

1600 depth: int | None = None, 

1601 symlinks: bool | None = None, 

1602 ) -> "Repo": 

1603 """Clone this repository. 

1604 

1605 Args: 

1606 target_path: Target path 

1607 mkdir: Create the target directory 

1608 bare: Whether to create a bare repository 

1609 checkout: Whether or not to check-out HEAD after cloning 

1610 origin: Base name for refs in target repository 

1611 cloned from this repository 

1612 branch: Optional branch or tag to be used as HEAD in the new repository 

1613 instead of this repository's HEAD. 

1614 progress: Optional progress function 

1615 depth: Depth at which to fetch 

1616 symlinks: Symlinks setting (default to autodetect) 

1617 Returns: Created repository as `Repo` 

1618 """ 

1619 encoded_path = os.fsencode(self.path) 

1620 

1621 if mkdir: 

1622 os.mkdir(target_path) 

1623 

1624 try: 

1625 if not bare: 

1626 target = Repo.init(target_path, symlinks=symlinks) 

1627 if checkout is None: 

1628 checkout = True 

1629 else: 

1630 if checkout: 

1631 raise ValueError("checkout and bare are incompatible") 

1632 target = Repo.init_bare(target_path) 

1633 

1634 try: 

1635 target_config = target.get_config() 

1636 target_config.set((b"remote", origin), b"url", encoded_path) 

1637 target_config.set( 

1638 (b"remote", origin), 

1639 b"fetch", 

1640 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1641 ) 

1642 target_config.write_to_path() 

1643 

1644 ref_message = b"clone: from " + encoded_path 

1645 self.fetch(target, depth=depth) 

1646 target.refs.import_refs( 

1647 b"refs/remotes/" + origin, 

1648 self.refs.as_dict(b"refs/heads"), 

1649 message=ref_message, 

1650 ) 

1651 target.refs.import_refs( 

1652 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1653 ) 

1654 

1655 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1656 origin_head = head_chain[-1] if head_chain else None 

1657 if origin_sha and not origin_head: 

1658 # set detached HEAD 

1659 target.refs[b"HEAD"] = origin_sha 

1660 else: 

1661 _set_origin_head(target.refs, origin, origin_head) 

1662 head_ref = _set_default_branch( 

1663 target.refs, origin, origin_head, branch, ref_message 

1664 ) 

1665 

1666 # Update target head 

1667 if head_ref: 

1668 head = _set_head(target.refs, head_ref, ref_message) 

1669 else: 

1670 head = None 

1671 

1672 if checkout and head is not None: 

1673 target.get_worktree().reset_index() 

1674 except BaseException: 

1675 target.close() 

1676 raise 

1677 except BaseException: 

1678 if mkdir: 

1679 import shutil 

1680 

1681 shutil.rmtree(target_path) 

1682 raise 

1683 return target 

1684 

1685 @replace_me(remove_in="0.26.0") 

1686 def reset_index(self, tree: bytes | None = None) -> None: 

1687 """Reset the index back to a specific tree. 

1688 

1689 Args: 

1690 tree: Tree SHA to reset to, None for current HEAD tree. 

1691 """ 

1692 return self.get_worktree().reset_index(tree) 

1693 

1694 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1695 """Get condition matchers for includeIf conditions. 

1696 

1697 Returns a dict of condition prefix to matcher function. 

1698 """ 

1699 from pathlib import Path 

1700 

1701 from .config import ConditionMatcher, match_glob_pattern 

1702 

1703 # Add gitdir matchers 

1704 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1705 """Match gitdir against a pattern. 

1706 

1707 Args: 

1708 pattern: Pattern to match against 

1709 case_sensitive: Whether to match case-sensitively 

1710 

1711 Returns: 

1712 True if gitdir matches pattern 

1713 """ 

1714 # Handle relative patterns (starting with ./) 

1715 if pattern.startswith("./"): 

1716 # Can't handle relative patterns without config directory context 

1717 return False 

1718 

1719 # Normalize repository path 

1720 try: 

1721 repo_path = str(Path(self._controldir).resolve()) 

1722 except (OSError, ValueError): 

1723 return False 

1724 

1725 # Expand ~ in pattern and normalize 

1726 pattern = os.path.expanduser(pattern) 

1727 

1728 # Normalize pattern following Git's rules 

1729 pattern = pattern.replace("\\", "/") 

1730 if not pattern.startswith(("~/", "./", "/", "**")): 

1731 # Check for Windows absolute path 

1732 if len(pattern) >= 2 and pattern[1] == ":": 

1733 pass 

1734 else: 

1735 pattern = "**/" + pattern 

1736 if pattern.endswith("/"): 

1737 pattern = pattern + "**" 

1738 

1739 # Use the existing _match_gitdir_pattern function 

1740 from .config import _match_gitdir_pattern 

1741 

1742 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1743 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1744 

1745 return _match_gitdir_pattern( 

1746 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1747 ) 

1748 

1749 # Add onbranch matcher 

1750 def match_onbranch(pattern: str) -> bool: 

1751 """Match current branch against a pattern. 

1752 

1753 Args: 

1754 pattern: Pattern to match against 

1755 

1756 Returns: 

1757 True if current branch matches pattern 

1758 """ 

1759 try: 

1760 # Get the current branch using refs 

1761 ref_chain, _ = self.refs.follow(b"HEAD") 

1762 head_ref = ref_chain[-1] # Get the final resolved ref 

1763 except KeyError: 

1764 pass 

1765 else: 

1766 if head_ref and head_ref.startswith(b"refs/heads/"): 

1767 # Extract branch name from ref 

1768 branch = extract_branch_name(head_ref).decode( 

1769 "utf-8", errors="replace" 

1770 ) 

1771 return match_glob_pattern(branch, pattern) 

1772 return False 

1773 

1774 matchers: dict[str, ConditionMatcher] = { 

1775 "onbranch:": match_onbranch, 

1776 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1777 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1778 } 

1779 

1780 return matchers 

1781 

1782 def get_worktree_config(self) -> "ConfigFile": 

1783 """Get the worktree-specific config. 

1784 

1785 Returns: 

1786 ConfigFile object for the worktree config 

1787 """ 

1788 from .config import ConfigFile 

1789 

1790 path = os.path.join(self.commondir(), "config.worktree") 

1791 try: 

1792 # Pass condition matchers for includeIf evaluation 

1793 condition_matchers = self._get_config_condition_matchers() 

1794 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1795 except FileNotFoundError: 

1796 cf = ConfigFile() 

1797 cf.path = path 

1798 return cf 

1799 

1800 def get_config(self) -> "ConfigFile": 

1801 """Retrieve the config object. 

1802 

1803 Returns: `ConfigFile` object for the ``.git/config`` file. 

1804 """ 

1805 from .config import ConfigFile 

1806 

1807 path = os.path.join(self._commondir, "config") 

1808 try: 

1809 # Pass condition matchers for includeIf evaluation 

1810 condition_matchers = self._get_config_condition_matchers() 

1811 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1812 except FileNotFoundError: 

1813 ret = ConfigFile() 

1814 ret.path = path 

1815 return ret 

1816 

1817 def get_rebase_state_manager(self) -> "RebaseStateManager": 

1818 """Get the appropriate rebase state manager for this repository. 

1819 

1820 Returns: DiskRebaseStateManager instance 

1821 """ 

1822 import os 

1823 

1824 from .rebase import DiskRebaseStateManager 

1825 

1826 path = os.path.join(self.controldir(), "rebase-merge") 

1827 return DiskRebaseStateManager(path) 

1828 

1829 def get_description(self) -> bytes | None: 

1830 """Retrieve the description of this repository. 

1831 

1832 Returns: Description as bytes or None. 

1833 """ 

1834 path = os.path.join(self._controldir, "description") 

1835 try: 

1836 with GitFile(path, "rb") as f: 

1837 return f.read() 

1838 except FileNotFoundError: 

1839 return None 

1840 

1841 def __repr__(self) -> str: 

1842 """Return string representation of this repository.""" 

1843 return f"<Repo at {self.path!r}>" 

1844 

1845 def set_description(self, description: bytes) -> None: 

1846 """Set the description for this repository. 

1847 

1848 Args: 

1849 description: Text to set as description for this repository. 

1850 """ 

1851 self._put_named_file("description", description) 

1852 

1853 @classmethod 

1854 def _init_maybe_bare( 

1855 cls, 

1856 path: str | bytes | os.PathLike[str], 

1857 controldir: str | bytes | os.PathLike[str], 

1858 bare: bool, 

1859 object_store: PackBasedObjectStore | None = None, 

1860 config: Optional["StackedConfig"] = None, 

1861 default_branch: bytes | None = None, 

1862 symlinks: bool | None = None, 

1863 format: int | None = None, 

1864 ) -> "Repo": 

1865 path = os.fspath(path) 

1866 if isinstance(path, bytes): 

1867 path = os.fsdecode(path) 

1868 controldir = os.fspath(controldir) 

1869 if isinstance(controldir, bytes): 

1870 controldir = os.fsdecode(controldir) 

1871 for d in BASE_DIRECTORIES: 

1872 os.mkdir(os.path.join(controldir, *d)) 

1873 if object_store is None: 

1874 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1875 ret = cls(path, bare=bare, object_store=object_store) 

1876 if default_branch is None: 

1877 if config is None: 

1878 from .config import StackedConfig 

1879 

1880 config = StackedConfig.default() 

1881 try: 

1882 default_branch = config.get("init", "defaultBranch") 

1883 except KeyError: 

1884 default_branch = DEFAULT_BRANCH 

1885 ret.refs.set_symbolic_ref(b"HEAD", local_branch_name(default_branch)) 

1886 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1887 return ret 

1888 

1889 @classmethod 

1890 def init( 

1891 cls, 

1892 path: str | bytes | os.PathLike[str], 

1893 *, 

1894 mkdir: bool = False, 

1895 config: Optional["StackedConfig"] = None, 

1896 default_branch: bytes | None = None, 

1897 symlinks: bool | None = None, 

1898 format: int | None = None, 

1899 ) -> "Repo": 

1900 """Create a new repository. 

1901 

1902 Args: 

1903 path: Path in which to create the repository 

1904 mkdir: Whether to create the directory 

1905 config: Configuration object 

1906 default_branch: Default branch name 

1907 symlinks: Whether to support symlinks 

1908 format: Repository format version (defaults to 0) 

1909 Returns: `Repo` instance 

1910 """ 

1911 path = os.fspath(path) 

1912 if isinstance(path, bytes): 

1913 path = os.fsdecode(path) 

1914 if mkdir: 

1915 os.mkdir(path) 

1916 controldir = os.path.join(path, CONTROLDIR) 

1917 os.mkdir(controldir) 

1918 _set_filesystem_hidden(controldir) 

1919 return cls._init_maybe_bare( 

1920 path, 

1921 controldir, 

1922 False, 

1923 config=config, 

1924 default_branch=default_branch, 

1925 symlinks=symlinks, 

1926 format=format, 

1927 ) 

1928 

1929 @classmethod 

1930 def _init_new_working_directory( 

1931 cls, 

1932 path: str | bytes | os.PathLike[str], 

1933 main_repo: "Repo", 

1934 identifier: str | None = None, 

1935 mkdir: bool = False, 

1936 ) -> "Repo": 

1937 """Create a new working directory linked to a repository. 

1938 

1939 Args: 

1940 path: Path in which to create the working tree. 

1941 main_repo: Main repository to reference 

1942 identifier: Worktree identifier 

1943 mkdir: Whether to create the directory 

1944 Returns: `Repo` instance 

1945 """ 

1946 path = os.fspath(path) 

1947 if isinstance(path, bytes): 

1948 path = os.fsdecode(path) 

1949 if mkdir: 

1950 os.mkdir(path) 

1951 if identifier is None: 

1952 identifier = os.path.basename(path) 

1953 # Ensure we use absolute path for the worktree control directory 

1954 main_controldir = os.path.abspath(main_repo.controldir()) 

1955 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1956 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1957 gitdirfile = os.path.join(path, CONTROLDIR) 

1958 with open(gitdirfile, "wb") as f: 

1959 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1960 try: 

1961 os.mkdir(main_worktreesdir) 

1962 except FileExistsError: 

1963 pass 

1964 try: 

1965 os.mkdir(worktree_controldir) 

1966 except FileExistsError: 

1967 pass 

1968 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1969 f.write(os.fsencode(gitdirfile) + b"\n") 

1970 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1971 f.write(b"../..\n") 

1972 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1973 f.write(main_repo.head() + b"\n") 

1974 r = cls(os.path.normpath(path)) 

1975 r.get_worktree().reset_index() 

1976 return r 

1977 

1978 @classmethod 

1979 def init_bare( 

1980 cls, 

1981 path: str | bytes | os.PathLike[str], 

1982 *, 

1983 mkdir: bool = False, 

1984 object_store: PackBasedObjectStore | None = None, 

1985 config: Optional["StackedConfig"] = None, 

1986 default_branch: bytes | None = None, 

1987 format: int | None = None, 

1988 ) -> "Repo": 

1989 """Create a new bare repository. 

1990 

1991 ``path`` should already exist and be an empty directory. 

1992 

1993 Args: 

1994 path: Path to create bare repository in 

1995 mkdir: Whether to create the directory 

1996 object_store: Object store to use 

1997 config: Configuration object 

1998 default_branch: Default branch name 

1999 format: Repository format version (defaults to 0) 

2000 Returns: a `Repo` instance 

2001 """ 

2002 path = os.fspath(path) 

2003 if isinstance(path, bytes): 

2004 path = os.fsdecode(path) 

2005 if mkdir: 

2006 os.mkdir(path) 

2007 return cls._init_maybe_bare( 

2008 path, 

2009 path, 

2010 True, 

2011 object_store=object_store, 

2012 config=config, 

2013 default_branch=default_branch, 

2014 format=format, 

2015 ) 

2016 

2017 create = init_bare 

2018 

2019 def close(self) -> None: 

2020 """Close any files opened by this repository.""" 

2021 self.object_store.close() 

2022 # Clean up filter context if it was created 

2023 if self.filter_context is not None: 

2024 self.filter_context.close() 

2025 self.filter_context = None 

2026 

2027 def __enter__(self) -> "Repo": 

2028 """Enter context manager.""" 

2029 return self 

2030 

2031 def __exit__( 

2032 self, 

2033 exc_type: type[BaseException] | None, 

2034 exc_val: BaseException | None, 

2035 exc_tb: TracebackType | None, 

2036 ) -> None: 

2037 """Exit context manager and close repository.""" 

2038 self.close() 

2039 

2040 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2041 """Read .gitattributes file from working tree. 

2042 

2043 Returns: 

2044 Dictionary mapping file patterns to attributes 

2045 """ 

2046 gitattributes = {} 

2047 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2048 

2049 if os.path.exists(gitattributes_path): 

2050 with open(gitattributes_path, "rb") as f: 

2051 for line in f: 

2052 line = line.strip() 

2053 if not line or line.startswith(b"#"): 

2054 continue 

2055 

2056 parts = line.split() 

2057 if len(parts) < 2: 

2058 continue 

2059 

2060 pattern = parts[0] 

2061 attrs = {} 

2062 

2063 for attr in parts[1:]: 

2064 if attr.startswith(b"-"): 

2065 # Unset attribute 

2066 attrs[attr[1:]] = b"false" 

2067 elif b"=" in attr: 

2068 # Set to value 

2069 key, value = attr.split(b"=", 1) 

2070 attrs[key] = value 

2071 else: 

2072 # Set attribute 

2073 attrs[attr] = b"true" 

2074 

2075 gitattributes[pattern] = attrs 

2076 

2077 return gitattributes 

2078 

2079 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2080 """Return a BlobNormalizer object.""" 

2081 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2082 

2083 # Get fresh configuration and GitAttributes 

2084 config_stack = self.get_config_stack() 

2085 git_attributes = self.get_gitattributes() 

2086 

2087 # Lazily create FilterContext if needed 

2088 if self.filter_context is None: 

2089 filter_registry = FilterRegistry(config_stack, self) 

2090 self.filter_context = FilterContext(filter_registry) 

2091 else: 

2092 # Refresh the context with current config to handle config changes 

2093 self.filter_context.refresh_config(config_stack) 

2094 

2095 # Return a new FilterBlobNormalizer with the context 

2096 return FilterBlobNormalizer( 

2097 config_stack, git_attributes, filter_context=self.filter_context 

2098 ) 

2099 

2100 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2101 """Read gitattributes for the repository. 

2102 

2103 Args: 

2104 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2105 

2106 Returns: 

2107 GitAttributes object that can be used to match paths 

2108 """ 

2109 from .attrs import ( 

2110 GitAttributes, 

2111 Pattern, 

2112 parse_git_attributes, 

2113 ) 

2114 

2115 patterns = [] 

2116 

2117 # Read system gitattributes (TODO: implement this) 

2118 # Read global gitattributes (TODO: implement this) 

2119 

2120 # Read repository .gitattributes from index/tree 

2121 if tree is None: 

2122 try: 

2123 # Try to get from HEAD 

2124 head = self[b"HEAD"] 

2125 if isinstance(head, Tag): 

2126 _cls, obj = head.object 

2127 head = self.get_object(obj) 

2128 assert isinstance(head, Commit) 

2129 tree = head.tree 

2130 except KeyError: 

2131 # No HEAD, no attributes from tree 

2132 pass 

2133 

2134 if tree is not None: 

2135 try: 

2136 tree_obj = self[tree] 

2137 assert isinstance(tree_obj, Tree) 

2138 if b".gitattributes" in tree_obj: 

2139 _, attrs_sha = tree_obj[b".gitattributes"] 

2140 attrs_blob = self[attrs_sha] 

2141 if isinstance(attrs_blob, Blob): 

2142 attrs_data = BytesIO(attrs_blob.data) 

2143 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2144 pattern = Pattern(pattern_bytes) 

2145 patterns.append((pattern, attrs)) 

2146 except (KeyError, NotTreeError): 

2147 pass 

2148 

2149 # Read .git/info/attributes 

2150 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2151 if os.path.exists(info_attrs_path): 

2152 with open(info_attrs_path, "rb") as f: 

2153 for pattern_bytes, attrs in parse_git_attributes(f): 

2154 pattern = Pattern(pattern_bytes) 

2155 patterns.append((pattern, attrs)) 

2156 

2157 # Read .gitattributes from working directory (if it exists) 

2158 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2159 if os.path.exists(working_attrs_path): 

2160 with open(working_attrs_path, "rb") as f: 

2161 for pattern_bytes, attrs in parse_git_attributes(f): 

2162 pattern = Pattern(pattern_bytes) 

2163 patterns.append((pattern, attrs)) 

2164 

2165 return GitAttributes(patterns) 

2166 

2167 @replace_me(remove_in="0.26.0") 

2168 def _sparse_checkout_file_path(self) -> str: 

2169 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2170 return self.get_worktree()._sparse_checkout_file_path() 

2171 

2172 @replace_me(remove_in="0.26.0") 

2173 def configure_for_cone_mode(self) -> None: 

2174 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2175 return self.get_worktree().configure_for_cone_mode() 

2176 

2177 @replace_me(remove_in="0.26.0") 

2178 def infer_cone_mode(self) -> bool: 

2179 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2180 return self.get_worktree().infer_cone_mode() 

2181 

2182 @replace_me(remove_in="0.26.0") 

2183 def get_sparse_checkout_patterns(self) -> list[str]: 

2184 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2185 

2186 Returns: 

2187 A list of patterns. Returns an empty list if the file is missing. 

2188 """ 

2189 return self.get_worktree().get_sparse_checkout_patterns() 

2190 

2191 @replace_me(remove_in="0.26.0") 

2192 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

2193 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2194 

2195 Creates the info/ directory if it does not exist. 

2196 

2197 Args: 

2198 patterns: A list of gitignore-style patterns to store. 

2199 """ 

2200 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2201 

2202 @replace_me(remove_in="0.26.0") 

2203 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None: 

2204 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2205 

2206 For each directory to include, add an inclusion line that "undoes" the prior 

2207 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2208 Never add the same line twice. 

2209 """ 

2210 return self.get_worktree().set_cone_mode_patterns(dirs) 

2211 

2212 

2213class MemoryRepo(BaseRepo): 

2214 """Repo that stores refs, objects, and named files in memory. 

2215 

2216 MemoryRepos are always bare: they have no working tree and no index, since 

2217 those have a stronger dependency on the filesystem. 

2218 """ 

2219 

2220 filter_context: Optional["FilterContext"] 

2221 

2222 def __init__(self) -> None: 

2223 """Create a new repository in memory.""" 

2224 from .config import ConfigFile 

2225 

2226 self._reflog: list[Any] = [] 

2227 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2228 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2229 self._named_files: dict[str, bytes] = {} 

2230 self.bare = True 

2231 self._config = ConfigFile() 

2232 self._description: bytes | None = None 

2233 self.filter_context = None 

2234 

2235 def _append_reflog( 

2236 self, 

2237 ref: bytes, 

2238 old_sha: bytes | None, 

2239 new_sha: bytes | None, 

2240 committer: bytes | None, 

2241 timestamp: int | None, 

2242 timezone: int | None, 

2243 message: bytes | None, 

2244 ) -> None: 

2245 self._reflog.append( 

2246 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2247 ) 

2248 

2249 def set_description(self, description: bytes) -> None: 

2250 """Set the description for this repository. 

2251 

2252 Args: 

2253 description: Text to set as description 

2254 """ 

2255 self._description = description 

2256 

2257 def get_description(self) -> bytes | None: 

2258 """Get the description of this repository. 

2259 

2260 Returns: 

2261 Repository description as bytes 

2262 """ 

2263 return self._description 

2264 

2265 def _determine_file_mode(self) -> bool: 

2266 """Probe the file-system to determine whether permissions can be trusted. 

2267 

2268 Returns: True if permissions can be trusted, False otherwise. 

2269 """ 

2270 return sys.platform != "win32" 

2271 

2272 def _determine_symlinks(self) -> bool: 

2273 """Probe the file-system to determine whether permissions can be trusted. 

2274 

2275 Returns: True if permissions can be trusted, False otherwise. 

2276 """ 

2277 return sys.platform != "win32" 

2278 

2279 def _put_named_file(self, path: str, contents: bytes) -> None: 

2280 """Write a file to the control dir with the given name and contents. 

2281 

2282 Args: 

2283 path: The path to the file, relative to the control dir. 

2284 contents: A string to write to the file. 

2285 """ 

2286 self._named_files[path] = contents 

2287 

2288 def _del_named_file(self, path: str) -> None: 

2289 try: 

2290 del self._named_files[path] 

2291 except KeyError: 

2292 pass 

2293 

2294 def get_named_file( 

2295 self, 

2296 path: str | bytes, 

2297 basedir: str | None = None, 

2298 ) -> BytesIO | None: 

2299 """Get a file from the control dir with a specific name. 

2300 

2301 Although the filename should be interpreted as a filename relative to 

2302 the control dir in a disk-baked Repo, the object returned need not be 

2303 pointing to a file in that location. 

2304 

2305 Args: 

2306 path: The path to the file, relative to the control dir. 

2307 basedir: Optional base directory for the path 

2308 Returns: An open file object, or None if the file does not exist. 

2309 """ 

2310 path_str = path.decode() if isinstance(path, bytes) else path 

2311 contents = self._named_files.get(path_str, None) 

2312 if contents is None: 

2313 return None 

2314 return BytesIO(contents) 

2315 

2316 def open_index(self) -> "Index": 

2317 """Fail to open index for this repo, since it is bare. 

2318 

2319 Raises: 

2320 NoIndexPresent: Raised when no index is present 

2321 """ 

2322 raise NoIndexPresent 

2323 

2324 def get_config(self) -> "ConfigFile": 

2325 """Retrieve the config object. 

2326 

2327 Returns: `ConfigFile` object. 

2328 """ 

2329 return self._config 

2330 

2331 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2332 """Get the appropriate rebase state manager for this repository. 

2333 

2334 Returns: MemoryRebaseStateManager instance 

2335 """ 

2336 from .rebase import MemoryRebaseStateManager 

2337 

2338 return MemoryRebaseStateManager(self) 

2339 

2340 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2341 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2342 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2343 

2344 # Get fresh configuration and GitAttributes 

2345 config_stack = self.get_config_stack() 

2346 git_attributes = self.get_gitattributes() 

2347 

2348 # Lazily create FilterContext if needed 

2349 if self.filter_context is None: 

2350 filter_registry = FilterRegistry(config_stack, self) 

2351 self.filter_context = FilterContext(filter_registry) 

2352 else: 

2353 # Refresh the context with current config to handle config changes 

2354 self.filter_context.refresh_config(config_stack) 

2355 

2356 # Return a new FilterBlobNormalizer with the context 

2357 return FilterBlobNormalizer( 

2358 config_stack, git_attributes, filter_context=self.filter_context 

2359 ) 

2360 

2361 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2362 """Read gitattributes for the repository.""" 

2363 from .attrs import GitAttributes 

2364 

2365 # Memory repos don't have working trees or gitattributes files 

2366 # Return empty GitAttributes 

2367 return GitAttributes([]) 

2368 

2369 def close(self) -> None: 

2370 """Close any resources opened by this repository.""" 

2371 # Clean up filter context if it was created 

2372 if self.filter_context is not None: 

2373 self.filter_context.close() 

2374 self.filter_context = None 

2375 

2376 def do_commit( 

2377 self, 

2378 message: bytes | None = None, 

2379 committer: bytes | None = None, 

2380 author: bytes | None = None, 

2381 commit_timestamp: float | None = None, 

2382 commit_timezone: int | None = None, 

2383 author_timestamp: float | None = None, 

2384 author_timezone: int | None = None, 

2385 tree: ObjectID | None = None, 

2386 encoding: bytes | None = None, 

2387 ref: Ref | None = b"HEAD", 

2388 merge_heads: list[ObjectID] | None = None, 

2389 no_verify: bool = False, 

2390 sign: bool = False, 

2391 ) -> bytes: 

2392 """Create a new commit. 

2393 

2394 This is a simplified implementation for in-memory repositories that 

2395 doesn't support worktree operations or hooks. 

2396 

2397 Args: 

2398 message: Commit message 

2399 committer: Committer fullname 

2400 author: Author fullname 

2401 commit_timestamp: Commit timestamp (defaults to now) 

2402 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2403 author_timestamp: Author timestamp (defaults to commit timestamp) 

2404 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2405 tree: SHA1 of the tree root to use 

2406 encoding: Encoding 

2407 ref: Optional ref to commit to (defaults to current branch). 

2408 If None, creates a dangling commit without updating any ref. 

2409 merge_heads: Merge heads 

2410 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2411 sign: GPG Sign the commit (ignored for MemoryRepo) 

2412 

2413 Returns: 

2414 New commit SHA1 

2415 """ 

2416 import time 

2417 

2418 from .objects import Commit 

2419 

2420 if tree is None: 

2421 raise ValueError("tree must be specified for MemoryRepo") 

2422 

2423 c = Commit() 

2424 if len(tree) != 40: 

2425 raise ValueError("tree must be a 40-byte hex sha string") 

2426 c.tree = tree 

2427 

2428 config = self.get_config_stack() 

2429 if merge_heads is None: 

2430 merge_heads = [] 

2431 if committer is None: 

2432 committer = get_user_identity(config, kind="COMMITTER") 

2433 check_user_identity(committer) 

2434 c.committer = committer 

2435 if commit_timestamp is None: 

2436 commit_timestamp = time.time() 

2437 c.commit_time = int(commit_timestamp) 

2438 if commit_timezone is None: 

2439 commit_timezone = 0 

2440 c.commit_timezone = commit_timezone 

2441 if author is None: 

2442 author = get_user_identity(config, kind="AUTHOR") 

2443 c.author = author 

2444 check_user_identity(author) 

2445 if author_timestamp is None: 

2446 author_timestamp = commit_timestamp 

2447 c.author_time = int(author_timestamp) 

2448 if author_timezone is None: 

2449 author_timezone = commit_timezone 

2450 c.author_timezone = author_timezone 

2451 if encoding is None: 

2452 try: 

2453 encoding = config.get(("i18n",), "commitEncoding") 

2454 except KeyError: 

2455 pass 

2456 if encoding is not None: 

2457 c.encoding = encoding 

2458 

2459 # Handle message (for MemoryRepo, we don't support callable messages) 

2460 if callable(message): 

2461 message = message(self, c) 

2462 if message is None: 

2463 raise ValueError("Message callback returned None") 

2464 

2465 if message is None: 

2466 raise ValueError("No commit message specified") 

2467 

2468 c.message = message 

2469 

2470 if ref is None: 

2471 # Create a dangling commit 

2472 c.parents = merge_heads 

2473 self.object_store.add_object(c) 

2474 else: 

2475 try: 

2476 old_head = self.refs[ref] 

2477 c.parents = [old_head, *merge_heads] 

2478 self.object_store.add_object(c) 

2479 ok = self.refs.set_if_equals( 

2480 ref, 

2481 old_head, 

2482 c.id, 

2483 message=b"commit: " + message, 

2484 committer=committer, 

2485 timestamp=int(commit_timestamp), 

2486 timezone=commit_timezone, 

2487 ) 

2488 except KeyError: 

2489 c.parents = merge_heads 

2490 self.object_store.add_object(c) 

2491 ok = self.refs.add_if_new( 

2492 ref, 

2493 c.id, 

2494 message=b"commit: " + message, 

2495 committer=committer, 

2496 timestamp=int(commit_timestamp), 

2497 timezone=commit_timezone, 

2498 ) 

2499 if not ok: 

2500 from .errors import CommitError 

2501 

2502 raise CommitError(f"{ref!r} changed during commit") 

2503 

2504 return c.id 

2505 

2506 @classmethod 

2507 def init_bare( 

2508 cls, 

2509 objects: Iterable[ShaFile], 

2510 refs: Mapping[bytes, bytes], 

2511 format: int | None = None, 

2512 ) -> "MemoryRepo": 

2513 """Create a new bare repository in memory. 

2514 

2515 Args: 

2516 objects: Objects for the new repository, 

2517 as iterable 

2518 refs: Refs as dictionary, mapping names 

2519 to object SHA1s 

2520 format: Repository format version (defaults to 0) 

2521 """ 

2522 ret = cls() 

2523 for obj in objects: 

2524 ret.object_store.add_object(obj) 

2525 for refname, sha in refs.items(): 

2526 ret.refs.add_if_new(refname, sha) 

2527 ret._init_files(bare=True, format=format) 

2528 return ret