Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1001 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence 

38from io import BytesIO 

39from types import TracebackType 

40from typing import ( 

41 TYPE_CHECKING, 

42 Any, 

43 BinaryIO, 

44 Callable, 

45 Optional, 

46 TypeVar, 

47 Union, 

48) 

49 

50if TYPE_CHECKING: 

51 # There are no circular imports here, but we try to defer imports as long 

52 # as possible to reduce start-up time for anything that doesn't need 

53 # these imports. 

54 from .attrs import GitAttributes 

55 from .config import ConditionMatcher, ConfigFile, StackedConfig 

56 from .diff_tree import RenameDetector 

57 from .filters import FilterBlobNormalizer, FilterContext 

58 from .index import Index 

59 from .notes import Notes 

60 from .object_store import BaseObjectStore, GraphWalker 

61 from .pack import UnpackedObject 

62 from .rebase import RebaseStateManager 

63 from .walk import Walker 

64 from .worktree import WorkTree 

65 

66from . import reflog, replace_me 

67from .errors import ( 

68 NoIndexPresent, 

69 NotBlobError, 

70 NotCommitError, 

71 NotGitRepository, 

72 NotTagError, 

73 NotTreeError, 

74 RefFormatError, 

75) 

76from .file import GitFile 

77from .hooks import ( 

78 CommitMsgShellHook, 

79 Hook, 

80 PostCommitShellHook, 

81 PostReceiveShellHook, 

82 PreCommitShellHook, 

83) 

84from .object_store import ( 

85 DiskObjectStore, 

86 MemoryObjectStore, 

87 MissingObjectFinder, 

88 ObjectStoreGraphWalker, 

89 PackBasedObjectStore, 

90 PackCapableObjectStore, 

91 find_shallow, 

92 peel_sha, 

93) 

94from .objects import ( 

95 Blob, 

96 Commit, 

97 ObjectID, 

98 ShaFile, 

99 Tag, 

100 Tree, 

101 check_hexsha, 

102 valid_hexsha, 

103) 

104from .pack import generate_unpacked_objects 

105from .refs import ( 

106 ANNOTATED_TAG_SUFFIX, # noqa: F401 

107 LOCAL_TAG_PREFIX, # noqa: F401 

108 SYMREF, # noqa: F401 

109 DictRefsContainer, 

110 DiskRefsContainer, 

111 InfoRefsContainer, # noqa: F401 

112 Ref, 

113 RefsContainer, 

114 _set_default_branch, 

115 _set_head, 

116 _set_origin_head, 

117 check_ref_format, # noqa: F401 

118 extract_branch_name, 

119 is_per_worktree_ref, 

120 local_branch_name, 

121 read_packed_refs, # noqa: F401 

122 read_packed_refs_with_peeled, # noqa: F401 

123 serialize_refs, 

124 write_packed_refs, # noqa: F401 

125) 

126 

127CONTROLDIR = ".git" 

128OBJECTDIR = "objects" 

129DEFAULT_OFS_DELTA = True 

130 

131T = TypeVar("T", bound="ShaFile") 

132REFSDIR = "refs" 

133REFSDIR_TAGS = "tags" 

134REFSDIR_HEADS = "heads" 

135INDEX_FILENAME = "index" 

136COMMONDIR = "commondir" 

137GITDIR = "gitdir" 

138WORKTREES = "worktrees" 

139 

140BASE_DIRECTORIES = [ 

141 ["branches"], 

142 [REFSDIR], 

143 [REFSDIR, REFSDIR_TAGS], 

144 [REFSDIR, REFSDIR_HEADS], 

145 ["hooks"], 

146 ["info"], 

147] 

148 

149DEFAULT_BRANCH = b"master" 

150 

151 

152class InvalidUserIdentity(Exception): 

153 """User identity is not of the format 'user <email>'.""" 

154 

155 def __init__(self, identity: str) -> None: 

156 """Initialize InvalidUserIdentity exception.""" 

157 self.identity = identity 

158 

159 

160class DefaultIdentityNotFound(Exception): 

161 """Default identity could not be determined.""" 

162 

163 

164# TODO(jelmer): Cache? 

165def _get_default_identity() -> tuple[str, str]: 

166 import socket 

167 

168 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

169 username = os.environ.get(name) 

170 if username: 

171 break 

172 else: 

173 username = None 

174 

175 try: 

176 import pwd 

177 except ImportError: 

178 fullname = None 

179 else: 

180 try: 

181 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

182 except KeyError: 

183 fullname = None 

184 else: 

185 if getattr(entry, "gecos", None): 

186 fullname = entry.pw_gecos.split(",")[0] 

187 else: 

188 fullname = None 

189 if username is None: 

190 username = entry.pw_name 

191 if not fullname: 

192 if username is None: 

193 raise DefaultIdentityNotFound("no username found") 

194 fullname = username 

195 email = os.environ.get("EMAIL") 

196 if email is None: 

197 if username is None: 

198 raise DefaultIdentityNotFound("no username found") 

199 email = f"{username}@{socket.gethostname()}" 

200 return (fullname, email) 

201 

202 

203def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

204 """Determine the identity to use for new commits. 

205 

206 If kind is set, this first checks 

207 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

208 

209 If those variables are not set, then it will fall back 

210 to reading the user.name and user.email settings from 

211 the specified configuration. 

212 

213 If that also fails, then it will fall back to using 

214 the current users' identity as obtained from the host 

215 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

216 

217 Args: 

218 config: Configuration stack to read from 

219 kind: Optional kind to return identity for, 

220 usually either "AUTHOR" or "COMMITTER". 

221 

222 Returns: 

223 A user identity 

224 """ 

225 user: Optional[bytes] = None 

226 email: Optional[bytes] = None 

227 if kind: 

228 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

229 if user_uc is not None: 

230 user = user_uc.encode("utf-8") 

231 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

232 if email_uc is not None: 

233 email = email_uc.encode("utf-8") 

234 if user is None: 

235 try: 

236 user = config.get(("user",), "name") 

237 except KeyError: 

238 user = None 

239 if email is None: 

240 try: 

241 email = config.get(("user",), "email") 

242 except KeyError: 

243 email = None 

244 default_user, default_email = _get_default_identity() 

245 if user is None: 

246 user = default_user.encode("utf-8") 

247 if email is None: 

248 email = default_email.encode("utf-8") 

249 if email.startswith(b"<") and email.endswith(b">"): 

250 email = email[1:-1] 

251 return user + b" <" + email + b">" 

252 

253 

254def check_user_identity(identity: bytes) -> None: 

255 """Verify that a user identity is formatted correctly. 

256 

257 Args: 

258 identity: User identity bytestring 

259 Raises: 

260 InvalidUserIdentity: Raised when identity is invalid 

261 """ 

262 try: 

263 _fst, snd = identity.split(b" <", 1) 

264 except ValueError as exc: 

265 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

266 if b">" not in snd: 

267 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

268 if b"\0" in identity or b"\n" in identity: 

269 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

270 

271 

272def parse_graftpoints( 

273 graftpoints: Iterable[bytes], 

274) -> dict[bytes, list[bytes]]: 

275 """Convert a list of graftpoints into a dict. 

276 

277 Args: 

278 graftpoints: Iterator of graftpoint lines 

279 

280 Each line is formatted as: 

281 <commit sha1> <parent sha1> [<parent sha1>]* 

282 

283 Resulting dictionary is: 

284 <commit sha1>: [<parent sha1>*] 

285 

286 https://git.wiki.kernel.org/index.php/GraftPoint 

287 """ 

288 grafts = {} 

289 for line in graftpoints: 

290 raw_graft = line.split(None, 1) 

291 

292 commit = raw_graft[0] 

293 if len(raw_graft) == 2: 

294 parents = raw_graft[1].split() 

295 else: 

296 parents = [] 

297 

298 for sha in [commit, *parents]: 

299 check_hexsha(sha, "Invalid graftpoint") 

300 

301 grafts[commit] = parents 

302 return grafts 

303 

304 

305def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes: 

306 """Convert a dictionary of grafts into string. 

307 

308 The graft dictionary is: 

309 <commit sha1>: [<parent sha1>*] 

310 

311 Each line is formatted as: 

312 <commit sha1> <parent sha1> [<parent sha1>]* 

313 

314 https://git.wiki.kernel.org/index.php/GraftPoint 

315 

316 """ 

317 graft_lines = [] 

318 for commit, parents in graftpoints.items(): 

319 if parents: 

320 graft_lines.append(commit + b" " + b" ".join(parents)) 

321 else: 

322 graft_lines.append(commit) 

323 return b"\n".join(graft_lines) 

324 

325 

326def _set_filesystem_hidden(path: str) -> None: 

327 """Mark path as to be hidden if supported by platform and filesystem. 

328 

329 On win32 uses SetFileAttributesW api: 

330 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

331 """ 

332 if sys.platform == "win32": 

333 import ctypes 

334 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

335 

336 FILE_ATTRIBUTE_HIDDEN = 2 

337 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

338 ("SetFileAttributesW", ctypes.windll.kernel32) 

339 ) 

340 

341 if isinstance(path, bytes): 

342 path = os.fsdecode(path) 

343 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

344 pass # Could raise or log `ctypes.WinError()` here 

345 

346 # Could implement other platform specific filesystem hiding here 

347 

348 

349class ParentsProvider: 

350 """Provider for commit parent information.""" 

351 

352 def __init__( 

353 self, 

354 store: "BaseObjectStore", 

355 grafts: dict[bytes, list[bytes]] = {}, 

356 shallows: Iterable[bytes] = [], 

357 ) -> None: 

358 """Initialize ParentsProvider. 

359 

360 Args: 

361 store: Object store to use 

362 grafts: Graft information 

363 shallows: Shallow commit SHAs 

364 """ 

365 self.store = store 

366 self.grafts = grafts 

367 self.shallows = set(shallows) 

368 

369 # Get commit graph once at initialization for performance 

370 self.commit_graph = store.get_commit_graph() 

371 

372 def get_parents( 

373 self, commit_id: bytes, commit: Optional[Commit] = None 

374 ) -> list[bytes]: 

375 """Get parents for a commit using the parents provider.""" 

376 try: 

377 return self.grafts[commit_id] 

378 except KeyError: 

379 pass 

380 if commit_id in self.shallows: 

381 return [] 

382 

383 # Try to use commit graph for faster parent lookup 

384 if self.commit_graph: 

385 parents = self.commit_graph.get_parents(commit_id) 

386 if parents is not None: 

387 return parents 

388 

389 # Fallback to reading the commit object 

390 if commit is None: 

391 obj = self.store[commit_id] 

392 assert isinstance(obj, Commit) 

393 commit = obj 

394 parents = commit.parents 

395 assert isinstance(parents, list) 

396 return parents 

397 

398 

399class BaseRepo: 

400 """Base class for a git repository. 

401 

402 This base class is meant to be used for Repository implementations that e.g. 

403 work on top of a different transport than a standard filesystem path. 

404 

405 Attributes: 

406 object_store: Dictionary-like object for accessing 

407 the objects 

408 refs: Dictionary-like object with the refs in this 

409 repository 

410 """ 

411 

412 def __init__( 

413 self, object_store: "PackCapableObjectStore", refs: RefsContainer 

414 ) -> None: 

415 """Open a repository. 

416 

417 This shouldn't be called directly, but rather through one of the 

418 base classes, such as MemoryRepo or Repo. 

419 

420 Args: 

421 object_store: Object store to use 

422 refs: Refs container to use 

423 """ 

424 self.object_store = object_store 

425 self.refs = refs 

426 

427 self._graftpoints: dict[bytes, list[bytes]] = {} 

428 self.hooks: dict[str, Hook] = {} 

429 

430 def _determine_file_mode(self) -> bool: 

431 """Probe the file-system to determine whether permissions can be trusted. 

432 

433 Returns: True if permissions can be trusted, False otherwise. 

434 """ 

435 raise NotImplementedError(self._determine_file_mode) 

436 

437 def _determine_symlinks(self) -> bool: 

438 """Probe the filesystem to determine whether symlinks can be created. 

439 

440 Returns: True if symlinks can be created, False otherwise. 

441 """ 

442 # For now, just mimic the old behaviour 

443 return sys.platform != "win32" 

444 

445 def _init_files( 

446 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

447 ) -> None: 

448 """Initialize a default set of named files.""" 

449 from .config import ConfigFile 

450 

451 self._put_named_file("description", b"Unnamed repository") 

452 f = BytesIO() 

453 cf = ConfigFile() 

454 if format is None: 

455 format = 0 

456 if format not in (0, 1): 

457 raise ValueError(f"Unsupported repository format version: {format}") 

458 cf.set("core", "repositoryformatversion", str(format)) 

459 if self._determine_file_mode(): 

460 cf.set("core", "filemode", True) 

461 else: 

462 cf.set("core", "filemode", False) 

463 

464 if symlinks is None and not bare: 

465 symlinks = self._determine_symlinks() 

466 

467 if symlinks is False: 

468 cf.set("core", "symlinks", symlinks) 

469 

470 cf.set("core", "bare", bare) 

471 cf.set("core", "logallrefupdates", True) 

472 cf.write_to_file(f) 

473 self._put_named_file("config", f.getvalue()) 

474 self._put_named_file(os.path.join("info", "exclude"), b"") 

475 

476 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

477 """Get a file from the control dir with a specific name. 

478 

479 Although the filename should be interpreted as a filename relative to 

480 the control dir in a disk-based Repo, the object returned need not be 

481 pointing to a file in that location. 

482 

483 Args: 

484 path: The path to the file, relative to the control dir. 

485 Returns: An open file object, or None if the file does not exist. 

486 """ 

487 raise NotImplementedError(self.get_named_file) 

488 

489 def _put_named_file(self, path: str, contents: bytes) -> None: 

490 """Write a file to the control dir with the given name and contents. 

491 

492 Args: 

493 path: The path to the file, relative to the control dir. 

494 contents: A string to write to the file. 

495 """ 

496 raise NotImplementedError(self._put_named_file) 

497 

498 def _del_named_file(self, path: str) -> None: 

499 """Delete a file in the control directory with the given name.""" 

500 raise NotImplementedError(self._del_named_file) 

501 

502 def open_index(self) -> "Index": 

503 """Open the index for this repository. 

504 

505 Raises: 

506 NoIndexPresent: If no index is present 

507 Returns: The matching `Index` 

508 """ 

509 raise NotImplementedError(self.open_index) 

510 

511 def fetch( 

512 self, 

513 target: "BaseRepo", 

514 determine_wants: Optional[ 

515 Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]] 

516 ] = None, 

517 progress: Optional[Callable[..., None]] = None, 

518 depth: Optional[int] = None, 

519 ) -> dict[bytes, bytes]: 

520 """Fetch objects into another repository. 

521 

522 Args: 

523 target: The target repository 

524 determine_wants: Optional function to determine what refs to 

525 fetch. 

526 progress: Optional progress function 

527 depth: Optional shallow fetch depth 

528 Returns: The local refs 

529 """ 

530 if determine_wants is None: 

531 determine_wants = target.object_store.determine_wants_all 

532 count, pack_data = self.fetch_pack_data( 

533 determine_wants, 

534 target.get_graph_walker(), 

535 progress=progress, 

536 depth=depth, 

537 ) 

538 target.object_store.add_pack_data(count, pack_data, progress) 

539 return self.get_refs() 

540 

541 def fetch_pack_data( 

542 self, 

543 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]], 

544 graph_walker: "GraphWalker", 

545 progress: Optional[Callable[[bytes], None]], 

546 *, 

547 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None, 

548 depth: Optional[int] = None, 

549 ) -> tuple[int, Iterator["UnpackedObject"]]: 

550 """Fetch the pack data required for a set of revisions. 

551 

552 Args: 

553 determine_wants: Function that takes a dictionary with heads 

554 and returns the list of heads to fetch. 

555 graph_walker: Object that can iterate over the list of revisions 

556 to fetch and has an "ack" method that will be called to acknowledge 

557 that a revision is present. 

558 progress: Simple progress function that will be called with 

559 updated progress strings. 

560 get_tagged: Function that returns a dict of pointed-to sha -> 

561 tag sha for including tags. 

562 depth: Shallow fetch depth 

563 Returns: count and iterator over pack data 

564 """ 

565 missing_objects = self.find_missing_objects( 

566 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

567 ) 

568 if missing_objects is None: 

569 return 0, iter([]) 

570 remote_has = missing_objects.get_remote_has() 

571 object_ids = list(missing_objects) 

572 return len(object_ids), generate_unpacked_objects( 

573 self.object_store, object_ids, progress=progress, other_haves=remote_has 

574 ) 

575 

576 def find_missing_objects( 

577 self, 

578 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]], 

579 graph_walker: "GraphWalker", 

580 progress: Optional[Callable[[bytes], None]], 

581 *, 

582 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None, 

583 depth: Optional[int] = None, 

584 ) -> Optional[MissingObjectFinder]: 

585 """Fetch the missing objects required for a set of revisions. 

586 

587 Args: 

588 determine_wants: Function that takes a dictionary with heads 

589 and returns the list of heads to fetch. 

590 graph_walker: Object that can iterate over the list of revisions 

591 to fetch and has an "ack" method that will be called to acknowledge 

592 that a revision is present. 

593 progress: Simple progress function that will be called with 

594 updated progress strings. 

595 get_tagged: Function that returns a dict of pointed-to sha -> 

596 tag sha for including tags. 

597 depth: Shallow fetch depth 

598 Returns: iterator over objects, with __len__ implemented 

599 """ 

600 refs = serialize_refs(self.object_store, self.get_refs()) 

601 

602 wants = determine_wants(refs, depth) 

603 if not isinstance(wants, list): 

604 raise TypeError("determine_wants() did not return a list") 

605 

606 current_shallow = set(getattr(graph_walker, "shallow", set())) 

607 

608 if depth not in (None, 0): 

609 assert depth is not None 

610 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

611 # Only update if graph_walker has shallow attribute 

612 if hasattr(graph_walker, "shallow"): 

613 graph_walker.shallow.update(shallow - not_shallow) 

614 new_shallow = graph_walker.shallow - current_shallow 

615 unshallow = not_shallow & current_shallow 

616 setattr(graph_walker, "unshallow", unshallow) 

617 if hasattr(graph_walker, "update_shallow"): 

618 graph_walker.update_shallow(new_shallow, unshallow) 

619 else: 

620 unshallow = getattr(graph_walker, "unshallow", set()) 

621 

622 if wants == []: 

623 # TODO(dborowitz): find a way to short-circuit that doesn't change 

624 # this interface. 

625 

626 if getattr(graph_walker, "shallow", set()) or unshallow: 

627 # Do not send a pack in shallow short-circuit path 

628 return None 

629 

630 # Return an actual MissingObjectFinder with empty wants 

631 return MissingObjectFinder( 

632 self.object_store, 

633 haves=[], 

634 wants=[], 

635 ) 

636 

637 # If the graph walker is set up with an implementation that can 

638 # ACK/NAK to the wire, it will write data to the client through 

639 # this call as a side-effect. 

640 haves = self.object_store.find_common_revisions(graph_walker) 

641 

642 # Deal with shallow requests separately because the haves do 

643 # not reflect what objects are missing 

644 if getattr(graph_walker, "shallow", set()) or unshallow: 

645 # TODO: filter the haves commits from iter_shas. the specific 

646 # commits aren't missing. 

647 haves = [] 

648 

649 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

650 

651 def get_parents(commit: Commit) -> list[bytes]: 

652 """Get parents for a commit using the parents provider. 

653 

654 Args: 

655 commit: Commit object 

656 

657 Returns: 

658 List of parent commit SHAs 

659 """ 

660 return parents_provider.get_parents(commit.id, commit) 

661 

662 return MissingObjectFinder( 

663 self.object_store, 

664 haves=haves, 

665 wants=wants, 

666 shallow=getattr(graph_walker, "shallow", set()), 

667 progress=progress, 

668 get_tagged=get_tagged, 

669 get_parents=get_parents, 

670 ) 

671 

672 def generate_pack_data( 

673 self, 

674 have: set[ObjectID], 

675 want: set[ObjectID], 

676 *, 

677 shallow: Optional[set[ObjectID]] = None, 

678 progress: Optional[Callable[[str], None]] = None, 

679 ofs_delta: Optional[bool] = None, 

680 ) -> tuple[int, Iterator["UnpackedObject"]]: 

681 """Generate pack data objects for a set of wants/haves. 

682 

683 Args: 

684 have: List of SHA1s of objects that should not be sent 

685 want: List of SHA1s of objects that should be sent 

686 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits) 

687 ofs_delta: Whether OFS deltas can be included 

688 progress: Optional progress reporting method 

689 """ 

690 if shallow is None: 

691 shallow = self.get_shallow() 

692 return self.object_store.generate_pack_data( 

693 have, 

694 want, 

695 shallow=shallow, 

696 progress=progress, 

697 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

698 ) 

699 

700 def get_graph_walker( 

701 self, heads: Optional[list[ObjectID]] = None 

702 ) -> ObjectStoreGraphWalker: 

703 """Retrieve a graph walker. 

704 

705 A graph walker is used by a remote repository (or proxy) 

706 to find out which objects are present in this repository. 

707 

708 Args: 

709 heads: Repository heads to use (optional) 

710 Returns: A graph walker object 

711 """ 

712 if heads is None: 

713 heads = [ 

714 sha 

715 for sha in self.refs.as_dict(b"refs/heads").values() 

716 if sha in self.object_store 

717 ] 

718 parents_provider = ParentsProvider(self.object_store) 

719 return ObjectStoreGraphWalker( 

720 heads, 

721 parents_provider.get_parents, 

722 shallow=self.get_shallow(), 

723 update_shallow=self.update_shallow, 

724 ) 

725 

726 def get_refs(self) -> dict[bytes, bytes]: 

727 """Get dictionary with all refs. 

728 

729 Returns: A ``dict`` mapping ref names to SHA1s 

730 """ 

731 return self.refs.as_dict() 

732 

733 def head(self) -> bytes: 

734 """Return the SHA1 pointed at by HEAD.""" 

735 # TODO: move this method to WorkTree 

736 return self.refs[b"HEAD"] 

737 

738 def _get_object(self, sha: bytes, cls: type[T]) -> T: 

739 assert len(sha) in (20, 40) 

740 ret = self.get_object(sha) 

741 if not isinstance(ret, cls): 

742 if cls is Commit: 

743 raise NotCommitError(ret.id) 

744 elif cls is Blob: 

745 raise NotBlobError(ret.id) 

746 elif cls is Tree: 

747 raise NotTreeError(ret.id) 

748 elif cls is Tag: 

749 raise NotTagError(ret.id) 

750 else: 

751 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

752 return ret 

753 

754 def get_object(self, sha: bytes) -> ShaFile: 

755 """Retrieve the object with the specified SHA. 

756 

757 Args: 

758 sha: SHA to retrieve 

759 Returns: A ShaFile object 

760 Raises: 

761 KeyError: when the object can not be found 

762 """ 

763 return self.object_store[sha] 

764 

765 def parents_provider(self) -> ParentsProvider: 

766 """Get a parents provider for this repository. 

767 

768 Returns: 

769 ParentsProvider instance configured with grafts and shallows 

770 """ 

771 return ParentsProvider( 

772 self.object_store, 

773 grafts=self._graftpoints, 

774 shallows=self.get_shallow(), 

775 ) 

776 

777 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

778 """Retrieve the parents of a specific commit. 

779 

780 If the specific commit is a graftpoint, the graft parents 

781 will be returned instead. 

782 

783 Args: 

784 sha: SHA of the commit for which to retrieve the parents 

785 commit: Optional commit matching the sha 

786 Returns: List of parents 

787 """ 

788 return self.parents_provider().get_parents(sha, commit) 

789 

790 def get_config(self) -> "ConfigFile": 

791 """Retrieve the config object. 

792 

793 Returns: `ConfigFile` object for the ``.git/config`` file. 

794 """ 

795 raise NotImplementedError(self.get_config) 

796 

797 def get_worktree_config(self) -> "ConfigFile": 

798 """Retrieve the worktree config object.""" 

799 raise NotImplementedError(self.get_worktree_config) 

800 

801 def get_description(self) -> Optional[bytes]: 

802 """Retrieve the description for this repository. 

803 

804 Returns: Bytes with the description of the repository 

805 as set by the user. 

806 """ 

807 raise NotImplementedError(self.get_description) 

808 

809 def set_description(self, description: bytes) -> None: 

810 """Set the description for this repository. 

811 

812 Args: 

813 description: Text to set as description for this repository. 

814 """ 

815 raise NotImplementedError(self.set_description) 

816 

817 def get_rebase_state_manager(self) -> "RebaseStateManager": 

818 """Get the appropriate rebase state manager for this repository. 

819 

820 Returns: RebaseStateManager instance 

821 """ 

822 raise NotImplementedError(self.get_rebase_state_manager) 

823 

824 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

825 """Return a BlobNormalizer object for checkin/checkout operations. 

826 

827 Returns: BlobNormalizer instance 

828 """ 

829 raise NotImplementedError(self.get_blob_normalizer) 

830 

831 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

832 """Read gitattributes for the repository. 

833 

834 Args: 

835 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

836 

837 Returns: 

838 GitAttributes object that can be used to match paths 

839 """ 

840 raise NotImplementedError(self.get_gitattributes) 

841 

842 def get_config_stack(self) -> "StackedConfig": 

843 """Return a config stack for this repository. 

844 

845 This stack accesses the configuration for both this repository 

846 itself (.git/config) and the global configuration, which usually 

847 lives in ~/.gitconfig. 

848 

849 Returns: `Config` instance for this repository 

850 """ 

851 from .config import ConfigFile, StackedConfig 

852 

853 local_config = self.get_config() 

854 backends: list[ConfigFile] = [local_config] 

855 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

856 backends.append(self.get_worktree_config()) 

857 

858 backends += StackedConfig.default_backends() 

859 return StackedConfig(backends, writable=local_config) 

860 

861 def get_shallow(self) -> set[ObjectID]: 

862 """Get the set of shallow commits. 

863 

864 Returns: Set of shallow commits. 

865 """ 

866 f = self.get_named_file("shallow") 

867 if f is None: 

868 return set() 

869 with f: 

870 return {line.strip() for line in f} 

871 

872 def update_shallow( 

873 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]] 

874 ) -> None: 

875 """Update the list of shallow objects. 

876 

877 Args: 

878 new_shallow: Newly shallow objects 

879 new_unshallow: Newly no longer shallow objects 

880 """ 

881 shallow = self.get_shallow() 

882 if new_shallow: 

883 shallow.update(new_shallow) 

884 if new_unshallow: 

885 shallow.difference_update(new_unshallow) 

886 if shallow: 

887 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

888 else: 

889 self._del_named_file("shallow") 

890 

891 def get_peeled(self, ref: Ref) -> ObjectID: 

892 """Get the peeled value of a ref. 

893 

894 Args: 

895 ref: The refname to peel. 

896 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

897 intermediate tags; if the original ref does not point to a tag, 

898 this will equal the original SHA1. 

899 """ 

900 cached = self.refs.get_peeled(ref) 

901 if cached is not None: 

902 return cached 

903 return peel_sha(self.object_store, self.refs[ref])[1].id 

904 

905 @property 

906 def notes(self) -> "Notes": 

907 """Access notes functionality for this repository. 

908 

909 Returns: 

910 Notes object for accessing notes 

911 """ 

912 from .notes import Notes 

913 

914 return Notes(self.object_store, self.refs) 

915 

916 def get_walker( 

917 self, 

918 include: Optional[Sequence[bytes]] = None, 

919 exclude: Optional[Sequence[bytes]] = None, 

920 order: str = "date", 

921 reverse: bool = False, 

922 max_entries: Optional[int] = None, 

923 paths: Optional[Sequence[bytes]] = None, 

924 rename_detector: Optional["RenameDetector"] = None, 

925 follow: bool = False, 

926 since: Optional[int] = None, 

927 until: Optional[int] = None, 

928 queue_cls: Optional[type] = None, 

929 ) -> "Walker": 

930 """Obtain a walker for this repository. 

931 

932 Args: 

933 include: Iterable of SHAs of commits to include along with their 

934 ancestors. Defaults to [HEAD] 

935 exclude: Iterable of SHAs of commits to exclude along with their 

936 ancestors, overriding includes. 

937 order: ORDER_* constant specifying the order of results. 

938 Anything other than ORDER_DATE may result in O(n) memory usage. 

939 reverse: If True, reverse the order of output, requiring O(n) 

940 memory. 

941 max_entries: The maximum number of entries to yield, or None for 

942 no limit. 

943 paths: Iterable of file or subtree paths to show entries for. 

944 rename_detector: diff.RenameDetector object for detecting 

945 renames. 

946 follow: If True, follow path across renames/copies. Forces a 

947 default rename_detector. 

948 since: Timestamp to list commits after. 

949 until: Timestamp to list commits before. 

950 queue_cls: A class to use for a queue of commits, supporting the 

951 iterator protocol. The constructor takes a single argument, the Walker. 

952 

953 Returns: A `Walker` object 

954 """ 

955 from .walk import Walker, _CommitTimeQueue 

956 

957 if include is None: 

958 include = [self.head()] 

959 

960 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

961 return Walker( 

962 self.object_store, 

963 include, 

964 exclude=exclude, 

965 order=order, 

966 reverse=reverse, 

967 max_entries=max_entries, 

968 paths=paths, 

969 rename_detector=rename_detector, 

970 follow=follow, 

971 since=since, 

972 until=until, 

973 get_parents=lambda commit: self.get_parents(commit.id, commit), 

974 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

975 ) 

976 

977 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile": 

978 """Retrieve a Git object by SHA1 or ref. 

979 

980 Args: 

981 name: A Git object SHA1 or a ref name 

982 Returns: A `ShaFile` object, such as a Commit or Blob 

983 Raises: 

984 KeyError: when the specified ref or object does not exist 

985 """ 

986 if not isinstance(name, bytes): 

987 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

988 if len(name) in (20, 40): 

989 try: 

990 return self.object_store[name] 

991 except (KeyError, ValueError): 

992 pass 

993 try: 

994 return self.object_store[self.refs[name]] 

995 except RefFormatError as exc: 

996 raise KeyError(name) from exc 

997 

998 def __contains__(self, name: bytes) -> bool: 

999 """Check if a specific Git object or ref is present. 

1000 

1001 Args: 

1002 name: Git object SHA1 or ref name 

1003 """ 

1004 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

1005 return name in self.object_store or name in self.refs 

1006 else: 

1007 return name in self.refs 

1008 

1009 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

1010 """Set a ref. 

1011 

1012 Args: 

1013 name: ref name 

1014 value: Ref value - either a ShaFile object, or a hex sha 

1015 """ 

1016 if name.startswith(b"refs/") or name == b"HEAD": 

1017 if isinstance(value, ShaFile): 

1018 self.refs[name] = value.id 

1019 elif isinstance(value, bytes): 

1020 self.refs[name] = value 

1021 else: 

1022 raise TypeError(value) 

1023 else: 

1024 raise ValueError(name) 

1025 

1026 def __delitem__(self, name: bytes) -> None: 

1027 """Remove a ref. 

1028 

1029 Args: 

1030 name: Name of the ref to remove 

1031 """ 

1032 if name.startswith(b"refs/") or name == b"HEAD": 

1033 del self.refs[name] 

1034 else: 

1035 raise ValueError(name) 

1036 

1037 def _get_user_identity( 

1038 self, config: "StackedConfig", kind: Optional[str] = None 

1039 ) -> bytes: 

1040 """Determine the identity to use for new commits.""" 

1041 warnings.warn( 

1042 "use get_user_identity() rather than Repo._get_user_identity", 

1043 DeprecationWarning, 

1044 ) 

1045 return get_user_identity(config) 

1046 

1047 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

1048 """Add or modify graftpoints. 

1049 

1050 Args: 

1051 updated_graftpoints: Dict of commit shas to list of parent shas 

1052 """ 

1053 # Simple validation 

1054 for commit, parents in updated_graftpoints.items(): 

1055 for sha in [commit, *parents]: 

1056 check_hexsha(sha, "Invalid graftpoint") 

1057 

1058 self._graftpoints.update(updated_graftpoints) 

1059 

1060 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None: 

1061 """Remove graftpoints. 

1062 

1063 Args: 

1064 to_remove: List of commit shas 

1065 """ 

1066 for sha in to_remove: 

1067 del self._graftpoints[sha] 

1068 

1069 def _read_heads(self, name: str) -> list[bytes]: 

1070 f = self.get_named_file(name) 

1071 if f is None: 

1072 return [] 

1073 with f: 

1074 return [line.strip() for line in f.readlines() if line.strip()] 

1075 

1076 def get_worktree(self) -> "WorkTree": 

1077 """Get the working tree for this repository. 

1078 

1079 Returns: 

1080 WorkTree instance for performing working tree operations 

1081 

1082 Raises: 

1083 NotImplementedError: If the repository doesn't support working trees 

1084 """ 

1085 raise NotImplementedError( 

1086 "Working tree operations not supported by this repository type" 

1087 ) 

1088 

1089 @replace_me(remove_in="0.26.0") 

1090 def do_commit( 

1091 self, 

1092 message: Optional[bytes] = None, 

1093 committer: Optional[bytes] = None, 

1094 author: Optional[bytes] = None, 

1095 commit_timestamp: Optional[float] = None, 

1096 commit_timezone: Optional[int] = None, 

1097 author_timestamp: Optional[float] = None, 

1098 author_timezone: Optional[int] = None, 

1099 tree: Optional[ObjectID] = None, 

1100 encoding: Optional[bytes] = None, 

1101 ref: Optional[Ref] = b"HEAD", 

1102 merge_heads: Optional[list[ObjectID]] = None, 

1103 no_verify: bool = False, 

1104 sign: bool = False, 

1105 ) -> bytes: 

1106 """Create a new commit. 

1107 

1108 If not specified, committer and author default to 

1109 get_user_identity(..., 'COMMITTER') 

1110 and get_user_identity(..., 'AUTHOR') respectively. 

1111 

1112 Args: 

1113 message: Commit message (bytes or callable that takes (repo, commit) 

1114 and returns bytes) 

1115 committer: Committer fullname 

1116 author: Author fullname 

1117 commit_timestamp: Commit timestamp (defaults to now) 

1118 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1119 author_timestamp: Author timestamp (defaults to commit 

1120 timestamp) 

1121 author_timezone: Author timestamp timezone 

1122 (defaults to commit timestamp timezone) 

1123 tree: SHA1 of the tree root to use (if not specified the 

1124 current index will be committed). 

1125 encoding: Encoding 

1126 ref: Optional ref to commit to (defaults to current branch). 

1127 If None, creates a dangling commit without updating any ref. 

1128 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1129 no_verify: Skip pre-commit and commit-msg hooks 

1130 sign: GPG Sign the commit (bool, defaults to False, 

1131 pass True to use default GPG key, 

1132 pass a str containing Key ID to use a specific GPG key) 

1133 

1134 Returns: 

1135 New commit SHA1 

1136 """ 

1137 return self.get_worktree().commit( 

1138 message=message, 

1139 committer=committer, 

1140 author=author, 

1141 commit_timestamp=commit_timestamp, 

1142 commit_timezone=commit_timezone, 

1143 author_timestamp=author_timestamp, 

1144 author_timezone=author_timezone, 

1145 tree=tree, 

1146 encoding=encoding, 

1147 ref=ref, 

1148 merge_heads=merge_heads, 

1149 no_verify=no_verify, 

1150 sign=sign, 

1151 ) 

1152 

1153 

1154def read_gitfile(f: BinaryIO) -> str: 

1155 """Read a ``.git`` file. 

1156 

1157 The first line of the file should start with "gitdir: " 

1158 

1159 Args: 

1160 f: File-like object to read from 

1161 Returns: A path 

1162 """ 

1163 cs = f.read() 

1164 if not cs.startswith(b"gitdir: "): 

1165 raise ValueError("Expected file to start with 'gitdir: '") 

1166 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8") 

1167 

1168 

1169class UnsupportedVersion(Exception): 

1170 """Unsupported repository version.""" 

1171 

1172 def __init__(self, version: int) -> None: 

1173 """Initialize UnsupportedVersion exception. 

1174 

1175 Args: 

1176 version: The unsupported repository version 

1177 """ 

1178 self.version = version 

1179 

1180 

1181class UnsupportedExtension(Exception): 

1182 """Unsupported repository extension.""" 

1183 

1184 def __init__(self, extension: str) -> None: 

1185 """Initialize UnsupportedExtension exception. 

1186 

1187 Args: 

1188 extension: The unsupported repository extension 

1189 """ 

1190 self.extension = extension 

1191 

1192 

1193class Repo(BaseRepo): 

1194 """A git repository backed by local disk. 

1195 

1196 To open an existing repository, call the constructor with 

1197 the path of the repository. 

1198 

1199 To create a new repository, use the Repo.init class method. 

1200 

1201 Note that a repository object may hold on to resources such 

1202 as file handles for performance reasons; call .close() to free 

1203 up those resources. 

1204 

1205 Attributes: 

1206 path: Path to the working copy (if it exists) or repository control 

1207 directory (if the repository is bare) 

1208 bare: Whether this is a bare repository 

1209 """ 

1210 

1211 path: str 

1212 bare: bool 

1213 object_store: DiskObjectStore 

1214 filter_context: Optional["FilterContext"] 

1215 

1216 def __init__( 

1217 self, 

1218 root: Union[str, bytes, os.PathLike[str]], 

1219 object_store: Optional[PackBasedObjectStore] = None, 

1220 bare: Optional[bool] = None, 

1221 ) -> None: 

1222 """Open a repository on disk. 

1223 

1224 Args: 

1225 root: Path to the repository's root. 

1226 object_store: ObjectStore to use; if omitted, we use the 

1227 repository's default object store 

1228 bare: True if this is a bare repository. 

1229 """ 

1230 root = os.fspath(root) 

1231 if isinstance(root, bytes): 

1232 root = os.fsdecode(root) 

1233 hidden_path = os.path.join(root, CONTROLDIR) 

1234 if bare is None: 

1235 if os.path.isfile(hidden_path) or os.path.isdir( 

1236 os.path.join(hidden_path, OBJECTDIR) 

1237 ): 

1238 bare = False 

1239 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1240 os.path.join(root, REFSDIR) 

1241 ): 

1242 bare = True 

1243 else: 

1244 raise NotGitRepository( 

1245 "No git repository was found at {path}".format(**dict(path=root)) 

1246 ) 

1247 

1248 self.bare = bare 

1249 if bare is False: 

1250 if os.path.isfile(hidden_path): 

1251 with open(hidden_path, "rb") as f: 

1252 path = read_gitfile(f) 

1253 self._controldir = os.path.join(root, path) 

1254 else: 

1255 self._controldir = hidden_path 

1256 else: 

1257 self._controldir = root 

1258 commondir = self.get_named_file(COMMONDIR) 

1259 if commondir is not None: 

1260 with commondir: 

1261 self._commondir = os.path.join( 

1262 self.controldir(), 

1263 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1264 ) 

1265 else: 

1266 self._commondir = self._controldir 

1267 self.path = root 

1268 

1269 # Initialize refs early so they're available for config condition matchers 

1270 self.refs = DiskRefsContainer( 

1271 self.commondir(), self._controldir, logger=self._write_reflog 

1272 ) 

1273 

1274 # Initialize worktrees container 

1275 from .worktree import WorkTreeContainer 

1276 

1277 self.worktrees = WorkTreeContainer(self) 

1278 

1279 config = self.get_config() 

1280 try: 

1281 repository_format_version = config.get("core", "repositoryformatversion") 

1282 format_version = ( 

1283 0 

1284 if repository_format_version is None 

1285 else int(repository_format_version) 

1286 ) 

1287 except KeyError: 

1288 format_version = 0 

1289 

1290 if format_version not in (0, 1): 

1291 raise UnsupportedVersion(format_version) 

1292 

1293 # Track extensions we encounter 

1294 has_reftable_extension = False 

1295 for extension, value in config.items((b"extensions",)): 

1296 if extension.lower() == b"refstorage": 

1297 if value == b"reftable": 

1298 has_reftable_extension = True 

1299 else: 

1300 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1301 elif extension.lower() not in (b"worktreeconfig",): 

1302 raise UnsupportedExtension(extension.decode("utf-8")) 

1303 

1304 if object_store is None: 

1305 object_store = DiskObjectStore.from_config( 

1306 os.path.join(self.commondir(), OBJECTDIR), config 

1307 ) 

1308 

1309 # Use reftable if extension is configured 

1310 if has_reftable_extension: 

1311 from .reftable import ReftableRefsContainer 

1312 

1313 self.refs = ReftableRefsContainer(self.commondir()) 

1314 # Update worktrees container after refs change 

1315 self.worktrees = WorkTreeContainer(self) 

1316 BaseRepo.__init__(self, object_store, self.refs) 

1317 

1318 self._graftpoints = {} 

1319 graft_file = self.get_named_file( 

1320 os.path.join("info", "grafts"), basedir=self.commondir() 

1321 ) 

1322 if graft_file: 

1323 with graft_file: 

1324 self._graftpoints.update(parse_graftpoints(graft_file)) 

1325 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1326 if graft_file: 

1327 with graft_file: 

1328 self._graftpoints.update(parse_graftpoints(graft_file)) 

1329 

1330 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1331 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1332 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1333 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1334 

1335 # Initialize filter context as None, will be created lazily 

1336 self.filter_context = None 

1337 

1338 def get_worktree(self) -> "WorkTree": 

1339 """Get the working tree for this repository. 

1340 

1341 Returns: 

1342 WorkTree instance for performing working tree operations 

1343 """ 

1344 from .worktree import WorkTree 

1345 

1346 return WorkTree(self, self.path) 

1347 

1348 def _write_reflog( 

1349 self, 

1350 ref: bytes, 

1351 old_sha: bytes, 

1352 new_sha: bytes, 

1353 committer: Optional[bytes], 

1354 timestamp: Optional[int], 

1355 timezone: Optional[int], 

1356 message: bytes, 

1357 ) -> None: 

1358 from .reflog import format_reflog_line 

1359 

1360 path = self._reflog_path(ref) 

1361 try: 

1362 os.makedirs(os.path.dirname(path)) 

1363 except FileExistsError: 

1364 pass 

1365 if committer is None: 

1366 config = self.get_config_stack() 

1367 committer = get_user_identity(config) 

1368 check_user_identity(committer) 

1369 if timestamp is None: 

1370 timestamp = int(time.time()) 

1371 if timezone is None: 

1372 timezone = 0 # FIXME 

1373 with open(path, "ab") as f: 

1374 f.write( 

1375 format_reflog_line( 

1376 old_sha, new_sha, committer, timestamp, timezone, message 

1377 ) 

1378 + b"\n" 

1379 ) 

1380 

1381 def _reflog_path(self, ref: bytes) -> str: 

1382 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1383 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1384 

1385 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1386 return os.path.join(base, "logs", os.fsdecode(ref)) 

1387 

1388 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1389 """Read reflog entries for a reference. 

1390 

1391 Args: 

1392 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1393 

1394 Yields: 

1395 reflog.Entry objects in chronological order (oldest first) 

1396 """ 

1397 from .reflog import read_reflog 

1398 

1399 path = self._reflog_path(ref) 

1400 try: 

1401 with open(path, "rb") as f: 

1402 yield from read_reflog(f) 

1403 except FileNotFoundError: 

1404 return 

1405 

1406 @classmethod 

1407 def discover(cls, start: Union[str, bytes, os.PathLike[str]] = ".") -> "Repo": 

1408 """Iterate parent directories to discover a repository. 

1409 

1410 Return a Repo object for the first parent directory that looks like a 

1411 Git repository. 

1412 

1413 Args: 

1414 start: The directory to start discovery from (defaults to '.') 

1415 """ 

1416 path = os.path.abspath(start) 

1417 while True: 

1418 try: 

1419 return cls(path) 

1420 except NotGitRepository: 

1421 new_path, _tail = os.path.split(path) 

1422 if new_path == path: # Root reached 

1423 break 

1424 path = new_path 

1425 start_str = os.fspath(start) 

1426 if isinstance(start_str, bytes): 

1427 start_str = start_str.decode("utf-8") 

1428 raise NotGitRepository(f"No git repository was found at {start_str}") 

1429 

1430 def controldir(self) -> str: 

1431 """Return the path of the control directory.""" 

1432 return self._controldir 

1433 

1434 def commondir(self) -> str: 

1435 """Return the path of the common directory. 

1436 

1437 For a main working tree, it is identical to controldir(). 

1438 

1439 For a linked working tree, it is the control directory of the 

1440 main working tree. 

1441 """ 

1442 return self._commondir 

1443 

1444 def _determine_file_mode(self) -> bool: 

1445 """Probe the file-system to determine whether permissions can be trusted. 

1446 

1447 Returns: True if permissions can be trusted, False otherwise. 

1448 """ 

1449 fname = os.path.join(self.path, ".probe-permissions") 

1450 with open(fname, "w") as f: 

1451 f.write("") 

1452 

1453 st1 = os.lstat(fname) 

1454 try: 

1455 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1456 except PermissionError: 

1457 return False 

1458 st2 = os.lstat(fname) 

1459 

1460 os.unlink(fname) 

1461 

1462 mode_differs = st1.st_mode != st2.st_mode 

1463 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1464 

1465 return mode_differs and st2_has_exec 

1466 

1467 def _determine_symlinks(self) -> bool: 

1468 """Probe the filesystem to determine whether symlinks can be created. 

1469 

1470 Returns: True if symlinks can be created, False otherwise. 

1471 """ 

1472 # TODO(jelmer): Actually probe disk / look at filesystem 

1473 return sys.platform != "win32" 

1474 

1475 def _put_named_file(self, path: str, contents: bytes) -> None: 

1476 """Write a file to the control dir with the given name and contents. 

1477 

1478 Args: 

1479 path: The path to the file, relative to the control dir. 

1480 contents: A string to write to the file. 

1481 """ 

1482 path = path.lstrip(os.path.sep) 

1483 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1484 f.write(contents) 

1485 

1486 def _del_named_file(self, path: str) -> None: 

1487 try: 

1488 os.unlink(os.path.join(self.controldir(), path)) 

1489 except FileNotFoundError: 

1490 return 

1491 

1492 def get_named_file( 

1493 self, 

1494 path: Union[str, bytes], 

1495 basedir: Optional[str] = None, 

1496 ) -> Optional[BinaryIO]: 

1497 """Get a file from the control dir with a specific name. 

1498 

1499 Although the filename should be interpreted as a filename relative to 

1500 the control dir in a disk-based Repo, the object returned need not be 

1501 pointing to a file in that location. 

1502 

1503 Args: 

1504 path: The path to the file, relative to the control dir. 

1505 basedir: Optional argument that specifies an alternative to the 

1506 control dir. 

1507 Returns: An open file object, or None if the file does not exist. 

1508 """ 

1509 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1510 # the dumb web serving code. 

1511 if basedir is None: 

1512 basedir = self.controldir() 

1513 if isinstance(path, bytes): 

1514 path = path.decode("utf-8") 

1515 path = path.lstrip(os.path.sep) 

1516 try: 

1517 return open(os.path.join(basedir, path), "rb") 

1518 except FileNotFoundError: 

1519 return None 

1520 

1521 def index_path(self) -> str: 

1522 """Return path to the index file.""" 

1523 return os.path.join(self.controldir(), INDEX_FILENAME) 

1524 

1525 def open_index(self) -> "Index": 

1526 """Open the index for this repository. 

1527 

1528 Raises: 

1529 NoIndexPresent: If no index is present 

1530 Returns: The matching `Index` 

1531 """ 

1532 from .index import Index 

1533 

1534 if not self.has_index(): 

1535 raise NoIndexPresent 

1536 

1537 # Check for manyFiles feature configuration 

1538 config = self.get_config_stack() 

1539 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1540 skip_hash = False 

1541 index_version = None 

1542 

1543 if many_files: 

1544 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1545 try: 

1546 index_version_str = config.get(b"index", b"version") 

1547 index_version = int(index_version_str) 

1548 except KeyError: 

1549 index_version = 4 # Default to version 4 for manyFiles 

1550 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1551 else: 

1552 # Check for explicit index settings 

1553 try: 

1554 index_version_str = config.get(b"index", b"version") 

1555 index_version = int(index_version_str) 

1556 except KeyError: 

1557 index_version = None 

1558 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1559 

1560 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1561 

1562 def has_index(self) -> bool: 

1563 """Check if an index is present.""" 

1564 # Bare repos must never have index files; non-bare repos may have a 

1565 # missing index file, which is treated as empty. 

1566 return not self.bare 

1567 

1568 @replace_me(remove_in="0.26.0") 

1569 def stage( 

1570 self, 

1571 fs_paths: Union[ 

1572 str, bytes, os.PathLike[str], Iterable[Union[str, bytes, os.PathLike[str]]] 

1573 ], 

1574 ) -> None: 

1575 """Stage a set of paths. 

1576 

1577 Args: 

1578 fs_paths: List of paths, relative to the repository path 

1579 """ 

1580 return self.get_worktree().stage(fs_paths) 

1581 

1582 @replace_me(remove_in="0.26.0") 

1583 def unstage(self, fs_paths: Sequence[str]) -> None: 

1584 """Unstage specific file in the index. 

1585 

1586 Args: 

1587 fs_paths: a list of files to unstage, 

1588 relative to the repository path. 

1589 """ 

1590 return self.get_worktree().unstage(fs_paths) 

1591 

1592 def clone( 

1593 self, 

1594 target_path: Union[str, bytes, os.PathLike[str]], 

1595 *, 

1596 mkdir: bool = True, 

1597 bare: bool = False, 

1598 origin: bytes = b"origin", 

1599 checkout: Optional[bool] = None, 

1600 branch: Optional[bytes] = None, 

1601 progress: Optional[Callable[[str], None]] = None, 

1602 depth: Optional[int] = None, 

1603 symlinks: Optional[bool] = None, 

1604 ) -> "Repo": 

1605 """Clone this repository. 

1606 

1607 Args: 

1608 target_path: Target path 

1609 mkdir: Create the target directory 

1610 bare: Whether to create a bare repository 

1611 checkout: Whether or not to check-out HEAD after cloning 

1612 origin: Base name for refs in target repository 

1613 cloned from this repository 

1614 branch: Optional branch or tag to be used as HEAD in the new repository 

1615 instead of this repository's HEAD. 

1616 progress: Optional progress function 

1617 depth: Depth at which to fetch 

1618 symlinks: Symlinks setting (default to autodetect) 

1619 Returns: Created repository as `Repo` 

1620 """ 

1621 encoded_path = os.fsencode(self.path) 

1622 

1623 if mkdir: 

1624 os.mkdir(target_path) 

1625 

1626 try: 

1627 if not bare: 

1628 target = Repo.init(target_path, symlinks=symlinks) 

1629 if checkout is None: 

1630 checkout = True 

1631 else: 

1632 if checkout: 

1633 raise ValueError("checkout and bare are incompatible") 

1634 target = Repo.init_bare(target_path) 

1635 

1636 try: 

1637 target_config = target.get_config() 

1638 target_config.set((b"remote", origin), b"url", encoded_path) 

1639 target_config.set( 

1640 (b"remote", origin), 

1641 b"fetch", 

1642 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1643 ) 

1644 target_config.write_to_path() 

1645 

1646 ref_message = b"clone: from " + encoded_path 

1647 self.fetch(target, depth=depth) 

1648 target.refs.import_refs( 

1649 b"refs/remotes/" + origin, 

1650 self.refs.as_dict(b"refs/heads"), 

1651 message=ref_message, 

1652 ) 

1653 target.refs.import_refs( 

1654 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1655 ) 

1656 

1657 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1658 origin_head = head_chain[-1] if head_chain else None 

1659 if origin_sha and not origin_head: 

1660 # set detached HEAD 

1661 target.refs[b"HEAD"] = origin_sha 

1662 else: 

1663 _set_origin_head(target.refs, origin, origin_head) 

1664 head_ref = _set_default_branch( 

1665 target.refs, origin, origin_head, branch, ref_message 

1666 ) 

1667 

1668 # Update target head 

1669 if head_ref: 

1670 head = _set_head(target.refs, head_ref, ref_message) 

1671 else: 

1672 head = None 

1673 

1674 if checkout and head is not None: 

1675 target.get_worktree().reset_index() 

1676 except BaseException: 

1677 target.close() 

1678 raise 

1679 except BaseException: 

1680 if mkdir: 

1681 import shutil 

1682 

1683 shutil.rmtree(target_path) 

1684 raise 

1685 return target 

1686 

1687 @replace_me(remove_in="0.26.0") 

1688 def reset_index(self, tree: Optional[bytes] = None) -> None: 

1689 """Reset the index back to a specific tree. 

1690 

1691 Args: 

1692 tree: Tree SHA to reset to, None for current HEAD tree. 

1693 """ 

1694 return self.get_worktree().reset_index(tree) 

1695 

1696 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1697 """Get condition matchers for includeIf conditions. 

1698 

1699 Returns a dict of condition prefix to matcher function. 

1700 """ 

1701 from pathlib import Path 

1702 

1703 from .config import ConditionMatcher, match_glob_pattern 

1704 

1705 # Add gitdir matchers 

1706 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1707 """Match gitdir against a pattern. 

1708 

1709 Args: 

1710 pattern: Pattern to match against 

1711 case_sensitive: Whether to match case-sensitively 

1712 

1713 Returns: 

1714 True if gitdir matches pattern 

1715 """ 

1716 # Handle relative patterns (starting with ./) 

1717 if pattern.startswith("./"): 

1718 # Can't handle relative patterns without config directory context 

1719 return False 

1720 

1721 # Normalize repository path 

1722 try: 

1723 repo_path = str(Path(self._controldir).resolve()) 

1724 except (OSError, ValueError): 

1725 return False 

1726 

1727 # Expand ~ in pattern and normalize 

1728 pattern = os.path.expanduser(pattern) 

1729 

1730 # Normalize pattern following Git's rules 

1731 pattern = pattern.replace("\\", "/") 

1732 if not pattern.startswith(("~/", "./", "/", "**")): 

1733 # Check for Windows absolute path 

1734 if len(pattern) >= 2 and pattern[1] == ":": 

1735 pass 

1736 else: 

1737 pattern = "**/" + pattern 

1738 if pattern.endswith("/"): 

1739 pattern = pattern + "**" 

1740 

1741 # Use the existing _match_gitdir_pattern function 

1742 from .config import _match_gitdir_pattern 

1743 

1744 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1745 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1746 

1747 return _match_gitdir_pattern( 

1748 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1749 ) 

1750 

1751 # Add onbranch matcher 

1752 def match_onbranch(pattern: str) -> bool: 

1753 """Match current branch against a pattern. 

1754 

1755 Args: 

1756 pattern: Pattern to match against 

1757 

1758 Returns: 

1759 True if current branch matches pattern 

1760 """ 

1761 try: 

1762 # Get the current branch using refs 

1763 ref_chain, _ = self.refs.follow(b"HEAD") 

1764 head_ref = ref_chain[-1] # Get the final resolved ref 

1765 except KeyError: 

1766 pass 

1767 else: 

1768 if head_ref and head_ref.startswith(b"refs/heads/"): 

1769 # Extract branch name from ref 

1770 branch = extract_branch_name(head_ref).decode( 

1771 "utf-8", errors="replace" 

1772 ) 

1773 return match_glob_pattern(branch, pattern) 

1774 return False 

1775 

1776 matchers: dict[str, ConditionMatcher] = { 

1777 "onbranch:": match_onbranch, 

1778 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1779 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1780 } 

1781 

1782 return matchers 

1783 

1784 def get_worktree_config(self) -> "ConfigFile": 

1785 """Get the worktree-specific config. 

1786 

1787 Returns: 

1788 ConfigFile object for the worktree config 

1789 """ 

1790 from .config import ConfigFile 

1791 

1792 path = os.path.join(self.commondir(), "config.worktree") 

1793 try: 

1794 # Pass condition matchers for includeIf evaluation 

1795 condition_matchers = self._get_config_condition_matchers() 

1796 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1797 except FileNotFoundError: 

1798 cf = ConfigFile() 

1799 cf.path = path 

1800 return cf 

1801 

1802 def get_config(self) -> "ConfigFile": 

1803 """Retrieve the config object. 

1804 

1805 Returns: `ConfigFile` object for the ``.git/config`` file. 

1806 """ 

1807 from .config import ConfigFile 

1808 

1809 path = os.path.join(self._commondir, "config") 

1810 try: 

1811 # Pass condition matchers for includeIf evaluation 

1812 condition_matchers = self._get_config_condition_matchers() 

1813 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1814 except FileNotFoundError: 

1815 ret = ConfigFile() 

1816 ret.path = path 

1817 return ret 

1818 

1819 def get_rebase_state_manager(self) -> "RebaseStateManager": 

1820 """Get the appropriate rebase state manager for this repository. 

1821 

1822 Returns: DiskRebaseStateManager instance 

1823 """ 

1824 import os 

1825 

1826 from .rebase import DiskRebaseStateManager 

1827 

1828 path = os.path.join(self.controldir(), "rebase-merge") 

1829 return DiskRebaseStateManager(path) 

1830 

1831 def get_description(self) -> Optional[bytes]: 

1832 """Retrieve the description of this repository. 

1833 

1834 Returns: Description as bytes or None. 

1835 """ 

1836 path = os.path.join(self._controldir, "description") 

1837 try: 

1838 with GitFile(path, "rb") as f: 

1839 return f.read() 

1840 except FileNotFoundError: 

1841 return None 

1842 

1843 def __repr__(self) -> str: 

1844 """Return string representation of this repository.""" 

1845 return f"<Repo at {self.path!r}>" 

1846 

1847 def set_description(self, description: bytes) -> None: 

1848 """Set the description for this repository. 

1849 

1850 Args: 

1851 description: Text to set as description for this repository. 

1852 """ 

1853 self._put_named_file("description", description) 

1854 

1855 @classmethod 

1856 def _init_maybe_bare( 

1857 cls, 

1858 path: Union[str, bytes, os.PathLike[str]], 

1859 controldir: Union[str, bytes, os.PathLike[str]], 

1860 bare: bool, 

1861 object_store: Optional[PackBasedObjectStore] = None, 

1862 config: Optional["StackedConfig"] = None, 

1863 default_branch: Optional[bytes] = None, 

1864 symlinks: Optional[bool] = None, 

1865 format: Optional[int] = None, 

1866 ) -> "Repo": 

1867 path = os.fspath(path) 

1868 if isinstance(path, bytes): 

1869 path = os.fsdecode(path) 

1870 controldir = os.fspath(controldir) 

1871 if isinstance(controldir, bytes): 

1872 controldir = os.fsdecode(controldir) 

1873 for d in BASE_DIRECTORIES: 

1874 os.mkdir(os.path.join(controldir, *d)) 

1875 if object_store is None: 

1876 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1877 ret = cls(path, bare=bare, object_store=object_store) 

1878 if default_branch is None: 

1879 if config is None: 

1880 from .config import StackedConfig 

1881 

1882 config = StackedConfig.default() 

1883 try: 

1884 default_branch = config.get("init", "defaultBranch") 

1885 except KeyError: 

1886 default_branch = DEFAULT_BRANCH 

1887 ret.refs.set_symbolic_ref(b"HEAD", local_branch_name(default_branch)) 

1888 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1889 return ret 

1890 

1891 @classmethod 

1892 def init( 

1893 cls, 

1894 path: Union[str, bytes, os.PathLike[str]], 

1895 *, 

1896 mkdir: bool = False, 

1897 config: Optional["StackedConfig"] = None, 

1898 default_branch: Optional[bytes] = None, 

1899 symlinks: Optional[bool] = None, 

1900 format: Optional[int] = None, 

1901 ) -> "Repo": 

1902 """Create a new repository. 

1903 

1904 Args: 

1905 path: Path in which to create the repository 

1906 mkdir: Whether to create the directory 

1907 config: Configuration object 

1908 default_branch: Default branch name 

1909 symlinks: Whether to support symlinks 

1910 format: Repository format version (defaults to 0) 

1911 Returns: `Repo` instance 

1912 """ 

1913 path = os.fspath(path) 

1914 if isinstance(path, bytes): 

1915 path = os.fsdecode(path) 

1916 if mkdir: 

1917 os.mkdir(path) 

1918 controldir = os.path.join(path, CONTROLDIR) 

1919 os.mkdir(controldir) 

1920 _set_filesystem_hidden(controldir) 

1921 return cls._init_maybe_bare( 

1922 path, 

1923 controldir, 

1924 False, 

1925 config=config, 

1926 default_branch=default_branch, 

1927 symlinks=symlinks, 

1928 format=format, 

1929 ) 

1930 

1931 @classmethod 

1932 def _init_new_working_directory( 

1933 cls, 

1934 path: Union[str, bytes, os.PathLike[str]], 

1935 main_repo: "Repo", 

1936 identifier: Optional[str] = None, 

1937 mkdir: bool = False, 

1938 ) -> "Repo": 

1939 """Create a new working directory linked to a repository. 

1940 

1941 Args: 

1942 path: Path in which to create the working tree. 

1943 main_repo: Main repository to reference 

1944 identifier: Worktree identifier 

1945 mkdir: Whether to create the directory 

1946 Returns: `Repo` instance 

1947 """ 

1948 path = os.fspath(path) 

1949 if isinstance(path, bytes): 

1950 path = os.fsdecode(path) 

1951 if mkdir: 

1952 os.mkdir(path) 

1953 if identifier is None: 

1954 identifier = os.path.basename(path) 

1955 # Ensure we use absolute path for the worktree control directory 

1956 main_controldir = os.path.abspath(main_repo.controldir()) 

1957 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

1958 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1959 gitdirfile = os.path.join(path, CONTROLDIR) 

1960 with open(gitdirfile, "wb") as f: 

1961 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1962 try: 

1963 os.mkdir(main_worktreesdir) 

1964 except FileExistsError: 

1965 pass 

1966 try: 

1967 os.mkdir(worktree_controldir) 

1968 except FileExistsError: 

1969 pass 

1970 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1971 f.write(os.fsencode(gitdirfile) + b"\n") 

1972 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1973 f.write(b"../..\n") 

1974 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1975 f.write(main_repo.head() + b"\n") 

1976 r = cls(os.path.normpath(path)) 

1977 r.get_worktree().reset_index() 

1978 return r 

1979 

1980 @classmethod 

1981 def init_bare( 

1982 cls, 

1983 path: Union[str, bytes, os.PathLike[str]], 

1984 *, 

1985 mkdir: bool = False, 

1986 object_store: Optional[PackBasedObjectStore] = None, 

1987 config: Optional["StackedConfig"] = None, 

1988 default_branch: Optional[bytes] = None, 

1989 format: Optional[int] = None, 

1990 ) -> "Repo": 

1991 """Create a new bare repository. 

1992 

1993 ``path`` should already exist and be an empty directory. 

1994 

1995 Args: 

1996 path: Path to create bare repository in 

1997 mkdir: Whether to create the directory 

1998 object_store: Object store to use 

1999 config: Configuration object 

2000 default_branch: Default branch name 

2001 format: Repository format version (defaults to 0) 

2002 Returns: a `Repo` instance 

2003 """ 

2004 path = os.fspath(path) 

2005 if isinstance(path, bytes): 

2006 path = os.fsdecode(path) 

2007 if mkdir: 

2008 os.mkdir(path) 

2009 return cls._init_maybe_bare( 

2010 path, 

2011 path, 

2012 True, 

2013 object_store=object_store, 

2014 config=config, 

2015 default_branch=default_branch, 

2016 format=format, 

2017 ) 

2018 

2019 create = init_bare 

2020 

2021 def close(self) -> None: 

2022 """Close any files opened by this repository.""" 

2023 self.object_store.close() 

2024 # Clean up filter context if it was created 

2025 if self.filter_context is not None: 

2026 self.filter_context.close() 

2027 self.filter_context = None 

2028 

2029 def __enter__(self) -> "Repo": 

2030 """Enter context manager.""" 

2031 return self 

2032 

2033 def __exit__( 

2034 self, 

2035 exc_type: Optional[type[BaseException]], 

2036 exc_val: Optional[BaseException], 

2037 exc_tb: Optional[TracebackType], 

2038 ) -> None: 

2039 """Exit context manager and close repository.""" 

2040 self.close() 

2041 

2042 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2043 """Read .gitattributes file from working tree. 

2044 

2045 Returns: 

2046 Dictionary mapping file patterns to attributes 

2047 """ 

2048 gitattributes = {} 

2049 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2050 

2051 if os.path.exists(gitattributes_path): 

2052 with open(gitattributes_path, "rb") as f: 

2053 for line in f: 

2054 line = line.strip() 

2055 if not line or line.startswith(b"#"): 

2056 continue 

2057 

2058 parts = line.split() 

2059 if len(parts) < 2: 

2060 continue 

2061 

2062 pattern = parts[0] 

2063 attrs = {} 

2064 

2065 for attr in parts[1:]: 

2066 if attr.startswith(b"-"): 

2067 # Unset attribute 

2068 attrs[attr[1:]] = b"false" 

2069 elif b"=" in attr: 

2070 # Set to value 

2071 key, value = attr.split(b"=", 1) 

2072 attrs[key] = value 

2073 else: 

2074 # Set attribute 

2075 attrs[attr] = b"true" 

2076 

2077 gitattributes[pattern] = attrs 

2078 

2079 return gitattributes 

2080 

2081 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2082 """Return a BlobNormalizer object.""" 

2083 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2084 

2085 # Get fresh configuration and GitAttributes 

2086 config_stack = self.get_config_stack() 

2087 git_attributes = self.get_gitattributes() 

2088 

2089 # Lazily create FilterContext if needed 

2090 if self.filter_context is None: 

2091 filter_registry = FilterRegistry(config_stack, self) 

2092 self.filter_context = FilterContext(filter_registry) 

2093 else: 

2094 # Refresh the context with current config to handle config changes 

2095 self.filter_context.refresh_config(config_stack) 

2096 

2097 # Return a new FilterBlobNormalizer with the context 

2098 return FilterBlobNormalizer( 

2099 config_stack, git_attributes, filter_context=self.filter_context 

2100 ) 

2101 

2102 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2103 """Read gitattributes for the repository. 

2104 

2105 Args: 

2106 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2107 

2108 Returns: 

2109 GitAttributes object that can be used to match paths 

2110 """ 

2111 from .attrs import ( 

2112 GitAttributes, 

2113 Pattern, 

2114 parse_git_attributes, 

2115 ) 

2116 

2117 patterns = [] 

2118 

2119 # Read system gitattributes (TODO: implement this) 

2120 # Read global gitattributes (TODO: implement this) 

2121 

2122 # Read repository .gitattributes from index/tree 

2123 if tree is None: 

2124 try: 

2125 # Try to get from HEAD 

2126 head = self[b"HEAD"] 

2127 if isinstance(head, Tag): 

2128 _cls, obj = head.object 

2129 head = self.get_object(obj) 

2130 assert isinstance(head, Commit) 

2131 tree = head.tree 

2132 except KeyError: 

2133 # No HEAD, no attributes from tree 

2134 pass 

2135 

2136 if tree is not None: 

2137 try: 

2138 tree_obj = self[tree] 

2139 assert isinstance(tree_obj, Tree) 

2140 if b".gitattributes" in tree_obj: 

2141 _, attrs_sha = tree_obj[b".gitattributes"] 

2142 attrs_blob = self[attrs_sha] 

2143 if isinstance(attrs_blob, Blob): 

2144 attrs_data = BytesIO(attrs_blob.data) 

2145 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2146 pattern = Pattern(pattern_bytes) 

2147 patterns.append((pattern, attrs)) 

2148 except (KeyError, NotTreeError): 

2149 pass 

2150 

2151 # Read .git/info/attributes 

2152 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2153 if os.path.exists(info_attrs_path): 

2154 with open(info_attrs_path, "rb") as f: 

2155 for pattern_bytes, attrs in parse_git_attributes(f): 

2156 pattern = Pattern(pattern_bytes) 

2157 patterns.append((pattern, attrs)) 

2158 

2159 # Read .gitattributes from working directory (if it exists) 

2160 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2161 if os.path.exists(working_attrs_path): 

2162 with open(working_attrs_path, "rb") as f: 

2163 for pattern_bytes, attrs in parse_git_attributes(f): 

2164 pattern = Pattern(pattern_bytes) 

2165 patterns.append((pattern, attrs)) 

2166 

2167 return GitAttributes(patterns) 

2168 

2169 @replace_me(remove_in="0.26.0") 

2170 def _sparse_checkout_file_path(self) -> str: 

2171 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2172 return self.get_worktree()._sparse_checkout_file_path() 

2173 

2174 @replace_me(remove_in="0.26.0") 

2175 def configure_for_cone_mode(self) -> None: 

2176 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2177 return self.get_worktree().configure_for_cone_mode() 

2178 

2179 @replace_me(remove_in="0.26.0") 

2180 def infer_cone_mode(self) -> bool: 

2181 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2182 return self.get_worktree().infer_cone_mode() 

2183 

2184 @replace_me(remove_in="0.26.0") 

2185 def get_sparse_checkout_patterns(self) -> list[str]: 

2186 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2187 

2188 Returns: 

2189 A list of patterns. Returns an empty list if the file is missing. 

2190 """ 

2191 return self.get_worktree().get_sparse_checkout_patterns() 

2192 

2193 @replace_me(remove_in="0.26.0") 

2194 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None: 

2195 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2196 

2197 Creates the info/ directory if it does not exist. 

2198 

2199 Args: 

2200 patterns: A list of gitignore-style patterns to store. 

2201 """ 

2202 return self.get_worktree().set_sparse_checkout_patterns(patterns) 

2203 

2204 @replace_me(remove_in="0.26.0") 

2205 def set_cone_mode_patterns(self, dirs: Union[Sequence[str], None] = None) -> None: 

2206 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2207 

2208 For each directory to include, add an inclusion line that "undoes" the prior 

2209 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2210 Never add the same line twice. 

2211 """ 

2212 return self.get_worktree().set_cone_mode_patterns(dirs) 

2213 

2214 

2215class MemoryRepo(BaseRepo): 

2216 """Repo that stores refs, objects, and named files in memory. 

2217 

2218 MemoryRepos are always bare: they have no working tree and no index, since 

2219 those have a stronger dependency on the filesystem. 

2220 """ 

2221 

2222 filter_context: Optional["FilterContext"] 

2223 

2224 def __init__(self) -> None: 

2225 """Create a new repository in memory.""" 

2226 from .config import ConfigFile 

2227 

2228 self._reflog: list[Any] = [] 

2229 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2230 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2231 self._named_files: dict[str, bytes] = {} 

2232 self.bare = True 

2233 self._config = ConfigFile() 

2234 self._description: Optional[bytes] = None 

2235 self.filter_context = None 

2236 

2237 def _append_reflog( 

2238 self, 

2239 ref: bytes, 

2240 old_sha: Optional[bytes], 

2241 new_sha: Optional[bytes], 

2242 committer: Optional[bytes], 

2243 timestamp: Optional[int], 

2244 timezone: Optional[int], 

2245 message: Optional[bytes], 

2246 ) -> None: 

2247 self._reflog.append( 

2248 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2249 ) 

2250 

2251 def set_description(self, description: bytes) -> None: 

2252 """Set the description for this repository. 

2253 

2254 Args: 

2255 description: Text to set as description 

2256 """ 

2257 self._description = description 

2258 

2259 def get_description(self) -> Optional[bytes]: 

2260 """Get the description of this repository. 

2261 

2262 Returns: 

2263 Repository description as bytes 

2264 """ 

2265 return self._description 

2266 

2267 def _determine_file_mode(self) -> bool: 

2268 """Probe the file-system to determine whether permissions can be trusted. 

2269 

2270 Returns: True if permissions can be trusted, False otherwise. 

2271 """ 

2272 return sys.platform != "win32" 

2273 

2274 def _determine_symlinks(self) -> bool: 

2275 """Probe the file-system to determine whether permissions can be trusted. 

2276 

2277 Returns: True if permissions can be trusted, False otherwise. 

2278 """ 

2279 return sys.platform != "win32" 

2280 

2281 def _put_named_file(self, path: str, contents: bytes) -> None: 

2282 """Write a file to the control dir with the given name and contents. 

2283 

2284 Args: 

2285 path: The path to the file, relative to the control dir. 

2286 contents: A string to write to the file. 

2287 """ 

2288 self._named_files[path] = contents 

2289 

2290 def _del_named_file(self, path: str) -> None: 

2291 try: 

2292 del self._named_files[path] 

2293 except KeyError: 

2294 pass 

2295 

2296 def get_named_file( 

2297 self, 

2298 path: Union[str, bytes], 

2299 basedir: Optional[str] = None, 

2300 ) -> Optional[BytesIO]: 

2301 """Get a file from the control dir with a specific name. 

2302 

2303 Although the filename should be interpreted as a filename relative to 

2304 the control dir in a disk-baked Repo, the object returned need not be 

2305 pointing to a file in that location. 

2306 

2307 Args: 

2308 path: The path to the file, relative to the control dir. 

2309 basedir: Optional base directory for the path 

2310 Returns: An open file object, or None if the file does not exist. 

2311 """ 

2312 path_str = path.decode() if isinstance(path, bytes) else path 

2313 contents = self._named_files.get(path_str, None) 

2314 if contents is None: 

2315 return None 

2316 return BytesIO(contents) 

2317 

2318 def open_index(self) -> "Index": 

2319 """Fail to open index for this repo, since it is bare. 

2320 

2321 Raises: 

2322 NoIndexPresent: Raised when no index is present 

2323 """ 

2324 raise NoIndexPresent 

2325 

2326 def get_config(self) -> "ConfigFile": 

2327 """Retrieve the config object. 

2328 

2329 Returns: `ConfigFile` object. 

2330 """ 

2331 return self._config 

2332 

2333 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2334 """Get the appropriate rebase state manager for this repository. 

2335 

2336 Returns: MemoryRebaseStateManager instance 

2337 """ 

2338 from .rebase import MemoryRebaseStateManager 

2339 

2340 return MemoryRebaseStateManager(self) 

2341 

2342 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2343 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2344 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2345 

2346 # Get fresh configuration and GitAttributes 

2347 config_stack = self.get_config_stack() 

2348 git_attributes = self.get_gitattributes() 

2349 

2350 # Lazily create FilterContext if needed 

2351 if self.filter_context is None: 

2352 filter_registry = FilterRegistry(config_stack, self) 

2353 self.filter_context = FilterContext(filter_registry) 

2354 else: 

2355 # Refresh the context with current config to handle config changes 

2356 self.filter_context.refresh_config(config_stack) 

2357 

2358 # Return a new FilterBlobNormalizer with the context 

2359 return FilterBlobNormalizer( 

2360 config_stack, git_attributes, filter_context=self.filter_context 

2361 ) 

2362 

2363 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2364 """Read gitattributes for the repository.""" 

2365 from .attrs import GitAttributes 

2366 

2367 # Memory repos don't have working trees or gitattributes files 

2368 # Return empty GitAttributes 

2369 return GitAttributes([]) 

2370 

2371 def close(self) -> None: 

2372 """Close any resources opened by this repository.""" 

2373 # Clean up filter context if it was created 

2374 if self.filter_context is not None: 

2375 self.filter_context.close() 

2376 self.filter_context = None 

2377 

2378 def do_commit( 

2379 self, 

2380 message: Optional[bytes] = None, 

2381 committer: Optional[bytes] = None, 

2382 author: Optional[bytes] = None, 

2383 commit_timestamp: Optional[float] = None, 

2384 commit_timezone: Optional[int] = None, 

2385 author_timestamp: Optional[float] = None, 

2386 author_timezone: Optional[int] = None, 

2387 tree: Optional[ObjectID] = None, 

2388 encoding: Optional[bytes] = None, 

2389 ref: Optional[Ref] = b"HEAD", 

2390 merge_heads: Optional[list[ObjectID]] = None, 

2391 no_verify: bool = False, 

2392 sign: bool = False, 

2393 ) -> bytes: 

2394 """Create a new commit. 

2395 

2396 This is a simplified implementation for in-memory repositories that 

2397 doesn't support worktree operations or hooks. 

2398 

2399 Args: 

2400 message: Commit message 

2401 committer: Committer fullname 

2402 author: Author fullname 

2403 commit_timestamp: Commit timestamp (defaults to now) 

2404 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2405 author_timestamp: Author timestamp (defaults to commit timestamp) 

2406 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2407 tree: SHA1 of the tree root to use 

2408 encoding: Encoding 

2409 ref: Optional ref to commit to (defaults to current branch). 

2410 If None, creates a dangling commit without updating any ref. 

2411 merge_heads: Merge heads 

2412 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2413 sign: GPG Sign the commit (ignored for MemoryRepo) 

2414 

2415 Returns: 

2416 New commit SHA1 

2417 """ 

2418 import time 

2419 

2420 from .objects import Commit 

2421 

2422 if tree is None: 

2423 raise ValueError("tree must be specified for MemoryRepo") 

2424 

2425 c = Commit() 

2426 if len(tree) != 40: 

2427 raise ValueError("tree must be a 40-byte hex sha string") 

2428 c.tree = tree 

2429 

2430 config = self.get_config_stack() 

2431 if merge_heads is None: 

2432 merge_heads = [] 

2433 if committer is None: 

2434 committer = get_user_identity(config, kind="COMMITTER") 

2435 check_user_identity(committer) 

2436 c.committer = committer 

2437 if commit_timestamp is None: 

2438 commit_timestamp = time.time() 

2439 c.commit_time = int(commit_timestamp) 

2440 if commit_timezone is None: 

2441 commit_timezone = 0 

2442 c.commit_timezone = commit_timezone 

2443 if author is None: 

2444 author = get_user_identity(config, kind="AUTHOR") 

2445 c.author = author 

2446 check_user_identity(author) 

2447 if author_timestamp is None: 

2448 author_timestamp = commit_timestamp 

2449 c.author_time = int(author_timestamp) 

2450 if author_timezone is None: 

2451 author_timezone = commit_timezone 

2452 c.author_timezone = author_timezone 

2453 if encoding is None: 

2454 try: 

2455 encoding = config.get(("i18n",), "commitEncoding") 

2456 except KeyError: 

2457 pass 

2458 if encoding is not None: 

2459 c.encoding = encoding 

2460 

2461 # Handle message (for MemoryRepo, we don't support callable messages) 

2462 if callable(message): 

2463 message = message(self, c) 

2464 if message is None: 

2465 raise ValueError("Message callback returned None") 

2466 

2467 if message is None: 

2468 raise ValueError("No commit message specified") 

2469 

2470 c.message = message 

2471 

2472 if ref is None: 

2473 # Create a dangling commit 

2474 c.parents = merge_heads 

2475 self.object_store.add_object(c) 

2476 else: 

2477 try: 

2478 old_head = self.refs[ref] 

2479 c.parents = [old_head, *merge_heads] 

2480 self.object_store.add_object(c) 

2481 ok = self.refs.set_if_equals( 

2482 ref, 

2483 old_head, 

2484 c.id, 

2485 message=b"commit: " + message, 

2486 committer=committer, 

2487 timestamp=int(commit_timestamp), 

2488 timezone=commit_timezone, 

2489 ) 

2490 except KeyError: 

2491 c.parents = merge_heads 

2492 self.object_store.add_object(c) 

2493 ok = self.refs.add_if_new( 

2494 ref, 

2495 c.id, 

2496 message=b"commit: " + message, 

2497 committer=committer, 

2498 timestamp=int(commit_timestamp), 

2499 timezone=commit_timezone, 

2500 ) 

2501 if not ok: 

2502 from .errors import CommitError 

2503 

2504 raise CommitError(f"{ref!r} changed during commit") 

2505 

2506 return c.id 

2507 

2508 @classmethod 

2509 def init_bare( 

2510 cls, 

2511 objects: Iterable[ShaFile], 

2512 refs: Mapping[bytes, bytes], 

2513 format: Optional[int] = None, 

2514 ) -> "MemoryRepo": 

2515 """Create a new bare repository in memory. 

2516 

2517 Args: 

2518 objects: Objects for the new repository, 

2519 as iterable 

2520 refs: Refs as dictionary, mapping names 

2521 to object SHA1s 

2522 format: Repository format version (defaults to 0) 

2523 """ 

2524 ret = cls() 

2525 for obj in objects: 

2526 ret.object_store.add_object(obj) 

2527 for refname, sha in refs.items(): 

2528 ret.refs.add_if_new(refname, sha) 

2529 ret._init_files(bare=True, format=format) 

2530 return ret