Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1093 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32import os 

33import stat 

34import sys 

35import time 

36import warnings 

37from collections.abc import Iterable 

38from io import BytesIO 

39from typing import ( 

40 TYPE_CHECKING, 

41 Any, 

42 BinaryIO, 

43 Callable, 

44 Optional, 

45 Union, 

46) 

47 

48if TYPE_CHECKING: 

49 # There are no circular imports here, but we try to defer imports as long 

50 # as possible to reduce start-up time for anything that doesn't need 

51 # these imports. 

52 from .attrs import GitAttributes 

53 from .config import ConditionMatcher, ConfigFile, StackedConfig 

54 from .index import Index 

55 from .notes import Notes 

56 

57from .errors import ( 

58 CommitError, 

59 HookError, 

60 NoIndexPresent, 

61 NotBlobError, 

62 NotCommitError, 

63 NotGitRepository, 

64 NotTagError, 

65 NotTreeError, 

66 RefFormatError, 

67) 

68from .file import GitFile 

69from .hooks import ( 

70 CommitMsgShellHook, 

71 Hook, 

72 PostCommitShellHook, 

73 PostReceiveShellHook, 

74 PreCommitShellHook, 

75) 

76from .object_store import ( 

77 DiskObjectStore, 

78 MemoryObjectStore, 

79 MissingObjectFinder, 

80 ObjectStoreGraphWalker, 

81 PackBasedObjectStore, 

82 find_shallow, 

83 peel_sha, 

84) 

85from .objects import ( 

86 Blob, 

87 Commit, 

88 ObjectID, 

89 ShaFile, 

90 Tag, 

91 Tree, 

92 check_hexsha, 

93 valid_hexsha, 

94) 

95from .pack import generate_unpacked_objects 

96from .refs import ( 

97 ANNOTATED_TAG_SUFFIX, # noqa: F401 

98 LOCAL_BRANCH_PREFIX, 

99 LOCAL_TAG_PREFIX, # noqa: F401 

100 SYMREF, # noqa: F401 

101 DictRefsContainer, 

102 DiskRefsContainer, 

103 InfoRefsContainer, # noqa: F401 

104 Ref, 

105 RefsContainer, 

106 _set_default_branch, 

107 _set_head, 

108 _set_origin_head, 

109 check_ref_format, # noqa: F401 

110 read_packed_refs, # noqa: F401 

111 read_packed_refs_with_peeled, # noqa: F401 

112 serialize_refs, 

113 write_packed_refs, # noqa: F401 

114) 

115 

116CONTROLDIR = ".git" 

117OBJECTDIR = "objects" 

118REFSDIR = "refs" 

119REFSDIR_TAGS = "tags" 

120REFSDIR_HEADS = "heads" 

121INDEX_FILENAME = "index" 

122COMMONDIR = "commondir" 

123GITDIR = "gitdir" 

124WORKTREES = "worktrees" 

125 

126BASE_DIRECTORIES = [ 

127 ["branches"], 

128 [REFSDIR], 

129 [REFSDIR, REFSDIR_TAGS], 

130 [REFSDIR, REFSDIR_HEADS], 

131 ["hooks"], 

132 ["info"], 

133] 

134 

135DEFAULT_BRANCH = b"master" 

136 

137 

138class InvalidUserIdentity(Exception): 

139 """User identity is not of the format 'user <email>'.""" 

140 

141 def __init__(self, identity) -> None: 

142 self.identity = identity 

143 

144 

145class DefaultIdentityNotFound(Exception): 

146 """Default identity could not be determined.""" 

147 

148 

149# TODO(jelmer): Cache? 

150def _get_default_identity() -> tuple[str, str]: 

151 import socket 

152 

153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

154 username = os.environ.get(name) 

155 if username: 

156 break 

157 else: 

158 username = None 

159 

160 try: 

161 import pwd 

162 except ImportError: 

163 fullname = None 

164 else: 

165 try: 

166 entry = pwd.getpwuid(os.getuid()) # type: ignore 

167 except KeyError: 

168 fullname = None 

169 else: 

170 if getattr(entry, "gecos", None): 

171 fullname = entry.pw_gecos.split(",")[0] 

172 else: 

173 fullname = None 

174 if username is None: 

175 username = entry.pw_name 

176 if not fullname: 

177 if username is None: 

178 raise DefaultIdentityNotFound("no username found") 

179 fullname = username 

180 email = os.environ.get("EMAIL") 

181 if email is None: 

182 if username is None: 

183 raise DefaultIdentityNotFound("no username found") 

184 email = f"{username}@{socket.gethostname()}" 

185 return (fullname, email) 

186 

187 

188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

189 """Determine the identity to use for new commits. 

190 

191 If kind is set, this first checks 

192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

193 

194 If those variables are not set, then it will fall back 

195 to reading the user.name and user.email settings from 

196 the specified configuration. 

197 

198 If that also fails, then it will fall back to using 

199 the current users' identity as obtained from the host 

200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

201 

202 Args: 

203 kind: Optional kind to return identity for, 

204 usually either "AUTHOR" or "COMMITTER". 

205 

206 Returns: 

207 A user identity 

208 """ 

209 user: Optional[bytes] = None 

210 email: Optional[bytes] = None 

211 if kind: 

212 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

213 if user_uc is not None: 

214 user = user_uc.encode("utf-8") 

215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

216 if email_uc is not None: 

217 email = email_uc.encode("utf-8") 

218 if user is None: 

219 try: 

220 user = config.get(("user",), "name") 

221 except KeyError: 

222 user = None 

223 if email is None: 

224 try: 

225 email = config.get(("user",), "email") 

226 except KeyError: 

227 email = None 

228 default_user, default_email = _get_default_identity() 

229 if user is None: 

230 user = default_user.encode("utf-8") 

231 if email is None: 

232 email = default_email.encode("utf-8") 

233 if email.startswith(b"<") and email.endswith(b">"): 

234 email = email[1:-1] 

235 return user + b" <" + email + b">" 

236 

237 

238def check_user_identity(identity) -> None: 

239 """Verify that a user identity is formatted correctly. 

240 

241 Args: 

242 identity: User identity bytestring 

243 Raises: 

244 InvalidUserIdentity: Raised when identity is invalid 

245 """ 

246 try: 

247 fst, snd = identity.split(b" <", 1) 

248 except ValueError as exc: 

249 raise InvalidUserIdentity(identity) from exc 

250 if b">" not in snd: 

251 raise InvalidUserIdentity(identity) 

252 if b"\0" in identity or b"\n" in identity: 

253 raise InvalidUserIdentity(identity) 

254 

255 

256def parse_graftpoints( 

257 graftpoints: Iterable[bytes], 

258) -> dict[bytes, list[bytes]]: 

259 """Convert a list of graftpoints into a dict. 

260 

261 Args: 

262 graftpoints: Iterator of graftpoint lines 

263 

264 Each line is formatted as: 

265 <commit sha1> <parent sha1> [<parent sha1>]* 

266 

267 Resulting dictionary is: 

268 <commit sha1>: [<parent sha1>*] 

269 

270 https://git.wiki.kernel.org/index.php/GraftPoint 

271 """ 

272 grafts = {} 

273 for line in graftpoints: 

274 raw_graft = line.split(None, 1) 

275 

276 commit = raw_graft[0] 

277 if len(raw_graft) == 2: 

278 parents = raw_graft[1].split() 

279 else: 

280 parents = [] 

281 

282 for sha in [commit, *parents]: 

283 check_hexsha(sha, "Invalid graftpoint") 

284 

285 grafts[commit] = parents 

286 return grafts 

287 

288 

289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes: 

290 """Convert a dictionary of grafts into string. 

291 

292 The graft dictionary is: 

293 <commit sha1>: [<parent sha1>*] 

294 

295 Each line is formatted as: 

296 <commit sha1> <parent sha1> [<parent sha1>]* 

297 

298 https://git.wiki.kernel.org/index.php/GraftPoint 

299 

300 """ 

301 graft_lines = [] 

302 for commit, parents in graftpoints.items(): 

303 if parents: 

304 graft_lines.append(commit + b" " + b" ".join(parents)) 

305 else: 

306 graft_lines.append(commit) 

307 return b"\n".join(graft_lines) 

308 

309 

310def _set_filesystem_hidden(path) -> None: 

311 """Mark path as to be hidden if supported by platform and filesystem. 

312 

313 On win32 uses SetFileAttributesW api: 

314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

315 """ 

316 if sys.platform == "win32": 

317 import ctypes 

318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

319 

320 FILE_ATTRIBUTE_HIDDEN = 2 

321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

322 ("SetFileAttributesW", ctypes.windll.kernel32) 

323 ) 

324 

325 if isinstance(path, bytes): 

326 path = os.fsdecode(path) 

327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

328 pass # Could raise or log `ctypes.WinError()` here 

329 

330 # Could implement other platform specific filesystem hiding here 

331 

332 

333class ParentsProvider: 

334 def __init__(self, store, grafts={}, shallows=[]) -> None: 

335 self.store = store 

336 self.grafts = grafts 

337 self.shallows = set(shallows) 

338 

339 # Get commit graph once at initialization for performance 

340 self.commit_graph = store.get_commit_graph() 

341 

342 def get_parents(self, commit_id, commit=None): 

343 try: 

344 return self.grafts[commit_id] 

345 except KeyError: 

346 pass 

347 if commit_id in self.shallows: 

348 return [] 

349 

350 # Try to use commit graph for faster parent lookup 

351 if self.commit_graph: 

352 parents = self.commit_graph.get_parents(commit_id) 

353 if parents is not None: 

354 return parents 

355 

356 # Fallback to reading the commit object 

357 if commit is None: 

358 commit = self.store[commit_id] 

359 return commit.parents 

360 

361 

362class BaseRepo: 

363 """Base class for a git repository. 

364 

365 This base class is meant to be used for Repository implementations that e.g. 

366 work on top of a different transport than a standard filesystem path. 

367 

368 Attributes: 

369 object_store: Dictionary-like object for accessing 

370 the objects 

371 refs: Dictionary-like object with the refs in this 

372 repository 

373 """ 

374 

375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

376 """Open a repository. 

377 

378 This shouldn't be called directly, but rather through one of the 

379 base classes, such as MemoryRepo or Repo. 

380 

381 Args: 

382 object_store: Object store to use 

383 refs: Refs container to use 

384 """ 

385 self.object_store = object_store 

386 self.refs = refs 

387 

388 self._graftpoints: dict[bytes, list[bytes]] = {} 

389 self.hooks: dict[str, Hook] = {} 

390 

391 def _determine_file_mode(self) -> bool: 

392 """Probe the file-system to determine whether permissions can be trusted. 

393 

394 Returns: True if permissions can be trusted, False otherwise. 

395 """ 

396 raise NotImplementedError(self._determine_file_mode) 

397 

398 def _determine_symlinks(self) -> bool: 

399 """Probe the filesystem to determine whether symlinks can be created. 

400 

401 Returns: True if symlinks can be created, False otherwise. 

402 """ 

403 # For now, just mimic the old behaviour 

404 return sys.platform != "win32" 

405 

406 def _init_files( 

407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None 

408 ) -> None: 

409 """Initialize a default set of named files.""" 

410 from .config import ConfigFile 

411 

412 self._put_named_file("description", b"Unnamed repository") 

413 f = BytesIO() 

414 cf = ConfigFile() 

415 if format is None: 

416 format = 0 

417 if format not in (0, 1): 

418 raise ValueError(f"Unsupported repository format version: {format}") 

419 cf.set("core", "repositoryformatversion", str(format)) 

420 if self._determine_file_mode(): 

421 cf.set("core", "filemode", True) 

422 else: 

423 cf.set("core", "filemode", False) 

424 

425 if symlinks is None and not bare: 

426 symlinks = self._determine_symlinks() 

427 

428 if symlinks is False: 

429 cf.set("core", "symlinks", symlinks) 

430 

431 cf.set("core", "bare", bare) 

432 cf.set("core", "logallrefupdates", True) 

433 cf.write_to_file(f) 

434 self._put_named_file("config", f.getvalue()) 

435 self._put_named_file(os.path.join("info", "exclude"), b"") 

436 

437 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

438 """Get a file from the control dir with a specific name. 

439 

440 Although the filename should be interpreted as a filename relative to 

441 the control dir in a disk-based Repo, the object returned need not be 

442 pointing to a file in that location. 

443 

444 Args: 

445 path: The path to the file, relative to the control dir. 

446 Returns: An open file object, or None if the file does not exist. 

447 """ 

448 raise NotImplementedError(self.get_named_file) 

449 

450 def _put_named_file(self, path: str, contents: bytes) -> None: 

451 """Write a file to the control dir with the given name and contents. 

452 

453 Args: 

454 path: The path to the file, relative to the control dir. 

455 contents: A string to write to the file. 

456 """ 

457 raise NotImplementedError(self._put_named_file) 

458 

459 def _del_named_file(self, path: str) -> None: 

460 """Delete a file in the control directory with the given name.""" 

461 raise NotImplementedError(self._del_named_file) 

462 

463 def open_index(self) -> "Index": 

464 """Open the index for this repository. 

465 

466 Raises: 

467 NoIndexPresent: If no index is present 

468 Returns: The matching `Index` 

469 """ 

470 raise NotImplementedError(self.open_index) 

471 

472 def fetch( 

473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None 

474 ): 

475 """Fetch objects into another repository. 

476 

477 Args: 

478 target: The target repository 

479 determine_wants: Optional function to determine what refs to 

480 fetch. 

481 progress: Optional progress function 

482 depth: Optional shallow fetch depth 

483 Returns: The local refs 

484 """ 

485 if determine_wants is None: 

486 determine_wants = target.object_store.determine_wants_all 

487 count, pack_data = self.fetch_pack_data( 

488 determine_wants, 

489 target.get_graph_walker(), 

490 progress=progress, 

491 depth=depth, 

492 ) 

493 target.object_store.add_pack_data(count, pack_data, progress) 

494 return self.get_refs() 

495 

496 def fetch_pack_data( 

497 self, 

498 determine_wants, 

499 graph_walker, 

500 progress, 

501 *, 

502 get_tagged=None, 

503 depth: Optional[int] = None, 

504 ): 

505 """Fetch the pack data required for a set of revisions. 

506 

507 Args: 

508 determine_wants: Function that takes a dictionary with heads 

509 and returns the list of heads to fetch. 

510 graph_walker: Object that can iterate over the list of revisions 

511 to fetch and has an "ack" method that will be called to acknowledge 

512 that a revision is present. 

513 progress: Simple progress function that will be called with 

514 updated progress strings. 

515 get_tagged: Function that returns a dict of pointed-to sha -> 

516 tag sha for including tags. 

517 depth: Shallow fetch depth 

518 Returns: count and iterator over pack data 

519 """ 

520 missing_objects = self.find_missing_objects( 

521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

522 ) 

523 if missing_objects is None: 

524 return 0, iter([]) 

525 remote_has = missing_objects.get_remote_has() 

526 object_ids = list(missing_objects) 

527 return len(object_ids), generate_unpacked_objects( 

528 self.object_store, object_ids, progress=progress, other_haves=remote_has 

529 ) 

530 

531 def find_missing_objects( 

532 self, 

533 determine_wants, 

534 graph_walker, 

535 progress, 

536 *, 

537 get_tagged=None, 

538 depth: Optional[int] = None, 

539 ) -> Optional[MissingObjectFinder]: 

540 """Fetch the missing objects required for a set of revisions. 

541 

542 Args: 

543 determine_wants: Function that takes a dictionary with heads 

544 and returns the list of heads to fetch. 

545 graph_walker: Object that can iterate over the list of revisions 

546 to fetch and has an "ack" method that will be called to acknowledge 

547 that a revision is present. 

548 progress: Simple progress function that will be called with 

549 updated progress strings. 

550 get_tagged: Function that returns a dict of pointed-to sha -> 

551 tag sha for including tags. 

552 depth: Shallow fetch depth 

553 Returns: iterator over objects, with __len__ implemented 

554 """ 

555 refs = serialize_refs(self.object_store, self.get_refs()) 

556 

557 wants = determine_wants(refs) 

558 if not isinstance(wants, list): 

559 raise TypeError("determine_wants() did not return a list") 

560 

561 current_shallow = set(getattr(graph_walker, "shallow", set())) 

562 

563 if depth not in (None, 0): 

564 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

565 # Only update if graph_walker has shallow attribute 

566 if hasattr(graph_walker, "shallow"): 

567 graph_walker.shallow.update(shallow - not_shallow) 

568 new_shallow = graph_walker.shallow - current_shallow 

569 unshallow = graph_walker.unshallow = not_shallow & current_shallow 

570 if hasattr(graph_walker, "update_shallow"): 

571 graph_walker.update_shallow(new_shallow, unshallow) 

572 else: 

573 unshallow = getattr(graph_walker, "unshallow", frozenset()) 

574 

575 if wants == []: 

576 # TODO(dborowitz): find a way to short-circuit that doesn't change 

577 # this interface. 

578 

579 if getattr(graph_walker, "shallow", set()) or unshallow: 

580 # Do not send a pack in shallow short-circuit path 

581 return None 

582 

583 class DummyMissingObjectFinder: 

584 def get_remote_has(self) -> None: 

585 return None 

586 

587 def __len__(self) -> int: 

588 return 0 

589 

590 def __iter__(self): 

591 yield from [] 

592 

593 return DummyMissingObjectFinder() # type: ignore 

594 

595 # If the graph walker is set up with an implementation that can 

596 # ACK/NAK to the wire, it will write data to the client through 

597 # this call as a side-effect. 

598 haves = self.object_store.find_common_revisions(graph_walker) 

599 

600 # Deal with shallow requests separately because the haves do 

601 # not reflect what objects are missing 

602 if getattr(graph_walker, "shallow", set()) or unshallow: 

603 # TODO: filter the haves commits from iter_shas. the specific 

604 # commits aren't missing. 

605 haves = [] 

606 

607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

608 

609 def get_parents(commit): 

610 return parents_provider.get_parents(commit.id, commit) 

611 

612 return MissingObjectFinder( 

613 self.object_store, 

614 haves=haves, 

615 wants=wants, 

616 shallow=getattr(graph_walker, "shallow", set()), 

617 progress=progress, 

618 get_tagged=get_tagged, 

619 get_parents=get_parents, 

620 ) 

621 

622 def generate_pack_data( 

623 self, 

624 have: list[ObjectID], 

625 want: list[ObjectID], 

626 progress: Optional[Callable[[str], None]] = None, 

627 ofs_delta: Optional[bool] = None, 

628 ): 

629 """Generate pack data objects for a set of wants/haves. 

630 

631 Args: 

632 have: List of SHA1s of objects that should not be sent 

633 want: List of SHA1s of objects that should be sent 

634 ofs_delta: Whether OFS deltas can be included 

635 progress: Optional progress reporting method 

636 """ 

637 return self.object_store.generate_pack_data( 

638 have, 

639 want, 

640 shallow=self.get_shallow(), 

641 progress=progress, 

642 ofs_delta=ofs_delta, 

643 ) 

644 

645 def get_graph_walker( 

646 self, heads: Optional[list[ObjectID]] = None 

647 ) -> ObjectStoreGraphWalker: 

648 """Retrieve a graph walker. 

649 

650 A graph walker is used by a remote repository (or proxy) 

651 to find out which objects are present in this repository. 

652 

653 Args: 

654 heads: Repository heads to use (optional) 

655 Returns: A graph walker object 

656 """ 

657 if heads is None: 

658 heads = [ 

659 sha 

660 for sha in self.refs.as_dict(b"refs/heads").values() 

661 if sha in self.object_store 

662 ] 

663 parents_provider = ParentsProvider(self.object_store) 

664 return ObjectStoreGraphWalker( 

665 heads, 

666 parents_provider.get_parents, 

667 shallow=self.get_shallow(), 

668 update_shallow=self.update_shallow, 

669 ) 

670 

671 def get_refs(self) -> dict[bytes, bytes]: 

672 """Get dictionary with all refs. 

673 

674 Returns: A ``dict`` mapping ref names to SHA1s 

675 """ 

676 return self.refs.as_dict() 

677 

678 def head(self) -> bytes: 

679 """Return the SHA1 pointed at by HEAD.""" 

680 return self.refs[b"HEAD"] 

681 

682 def _get_object(self, sha, cls): 

683 assert len(sha) in (20, 40) 

684 ret = self.get_object(sha) 

685 if not isinstance(ret, cls): 

686 if cls is Commit: 

687 raise NotCommitError(ret) 

688 elif cls is Blob: 

689 raise NotBlobError(ret) 

690 elif cls is Tree: 

691 raise NotTreeError(ret) 

692 elif cls is Tag: 

693 raise NotTagError(ret) 

694 else: 

695 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

696 return ret 

697 

698 def get_object(self, sha: bytes) -> ShaFile: 

699 """Retrieve the object with the specified SHA. 

700 

701 Args: 

702 sha: SHA to retrieve 

703 Returns: A ShaFile object 

704 Raises: 

705 KeyError: when the object can not be found 

706 """ 

707 return self.object_store[sha] 

708 

709 def parents_provider(self) -> ParentsProvider: 

710 return ParentsProvider( 

711 self.object_store, 

712 grafts=self._graftpoints, 

713 shallows=self.get_shallow(), 

714 ) 

715 

716 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]: 

717 """Retrieve the parents of a specific commit. 

718 

719 If the specific commit is a graftpoint, the graft parents 

720 will be returned instead. 

721 

722 Args: 

723 sha: SHA of the commit for which to retrieve the parents 

724 commit: Optional commit matching the sha 

725 Returns: List of parents 

726 """ 

727 return self.parents_provider().get_parents(sha, commit) 

728 

729 def get_config(self) -> "ConfigFile": 

730 """Retrieve the config object. 

731 

732 Returns: `ConfigFile` object for the ``.git/config`` file. 

733 """ 

734 raise NotImplementedError(self.get_config) 

735 

736 def get_worktree_config(self) -> "ConfigFile": 

737 """Retrieve the worktree config object.""" 

738 raise NotImplementedError(self.get_worktree_config) 

739 

740 def get_description(self) -> Optional[str]: 

741 """Retrieve the description for this repository. 

742 

743 Returns: String with the description of the repository 

744 as set by the user. 

745 """ 

746 raise NotImplementedError(self.get_description) 

747 

748 def set_description(self, description) -> None: 

749 """Set the description for this repository. 

750 

751 Args: 

752 description: Text to set as description for this repository. 

753 """ 

754 raise NotImplementedError(self.set_description) 

755 

756 def get_rebase_state_manager(self): 

757 """Get the appropriate rebase state manager for this repository. 

758 

759 Returns: RebaseStateManager instance 

760 """ 

761 raise NotImplementedError(self.get_rebase_state_manager) 

762 

763 def get_blob_normalizer(self): 

764 """Return a BlobNormalizer object for checkin/checkout operations. 

765 

766 Returns: BlobNormalizer instance 

767 """ 

768 raise NotImplementedError(self.get_blob_normalizer) 

769 

770 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

771 """Read gitattributes for the repository. 

772 

773 Args: 

774 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

775 

776 Returns: 

777 GitAttributes object that can be used to match paths 

778 """ 

779 raise NotImplementedError(self.get_gitattributes) 

780 

781 def get_config_stack(self) -> "StackedConfig": 

782 """Return a config stack for this repository. 

783 

784 This stack accesses the configuration for both this repository 

785 itself (.git/config) and the global configuration, which usually 

786 lives in ~/.gitconfig. 

787 

788 Returns: `Config` instance for this repository 

789 """ 

790 from .config import ConfigFile, StackedConfig 

791 

792 local_config = self.get_config() 

793 backends: list[ConfigFile] = [local_config] 

794 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

795 backends.append(self.get_worktree_config()) 

796 

797 backends += StackedConfig.default_backends() 

798 return StackedConfig(backends, writable=local_config) 

799 

800 def get_shallow(self) -> set[ObjectID]: 

801 """Get the set of shallow commits. 

802 

803 Returns: Set of shallow commits. 

804 """ 

805 f = self.get_named_file("shallow") 

806 if f is None: 

807 return set() 

808 with f: 

809 return {line.strip() for line in f} 

810 

811 def update_shallow(self, new_shallow, new_unshallow) -> None: 

812 """Update the list of shallow objects. 

813 

814 Args: 

815 new_shallow: Newly shallow objects 

816 new_unshallow: Newly no longer shallow objects 

817 """ 

818 shallow = self.get_shallow() 

819 if new_shallow: 

820 shallow.update(new_shallow) 

821 if new_unshallow: 

822 shallow.difference_update(new_unshallow) 

823 if shallow: 

824 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

825 else: 

826 self._del_named_file("shallow") 

827 

828 def get_peeled(self, ref: Ref) -> ObjectID: 

829 """Get the peeled value of a ref. 

830 

831 Args: 

832 ref: The refname to peel. 

833 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

834 intermediate tags; if the original ref does not point to a tag, 

835 this will equal the original SHA1. 

836 """ 

837 cached = self.refs.get_peeled(ref) 

838 if cached is not None: 

839 return cached 

840 return peel_sha(self.object_store, self.refs[ref])[1].id 

841 

842 @property 

843 def notes(self) -> "Notes": 

844 """Access notes functionality for this repository. 

845 

846 Returns: 

847 Notes object for accessing notes 

848 """ 

849 from .notes import Notes 

850 

851 return Notes(self.object_store, self.refs) 

852 

853 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs): 

854 """Obtain a walker for this repository. 

855 

856 Args: 

857 include: Iterable of SHAs of commits to include along with their 

858 ancestors. Defaults to [HEAD] 

859 

860 Keyword Args: 

861 exclude: Iterable of SHAs of commits to exclude along with their 

862 ancestors, overriding includes. 

863 order: ORDER_* constant specifying the order of results. 

864 Anything other than ORDER_DATE may result in O(n) memory usage. 

865 reverse: If True, reverse the order of output, requiring O(n) 

866 memory. 

867 max_entries: The maximum number of entries to yield, or None for 

868 no limit. 

869 paths: Iterable of file or subtree paths to show entries for. 

870 rename_detector: diff.RenameDetector object for detecting 

871 renames. 

872 follow: If True, follow path across renames/copies. Forces a 

873 default rename_detector. 

874 since: Timestamp to list commits after. 

875 until: Timestamp to list commits before. 

876 queue_cls: A class to use for a queue of commits, supporting the 

877 iterator protocol. The constructor takes a single argument, the 

878 Walker. 

879 

880 Returns: A `Walker` object 

881 """ 

882 from .walk import Walker 

883 

884 if include is None: 

885 include = [self.head()] 

886 

887 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

888 

889 return Walker(self.object_store, include, **kwargs) 

890 

891 def __getitem__(self, name: Union[ObjectID, Ref]): 

892 """Retrieve a Git object by SHA1 or ref. 

893 

894 Args: 

895 name: A Git object SHA1 or a ref name 

896 Returns: A `ShaFile` object, such as a Commit or Blob 

897 Raises: 

898 KeyError: when the specified ref or object does not exist 

899 """ 

900 if not isinstance(name, bytes): 

901 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

902 if len(name) in (20, 40): 

903 try: 

904 return self.object_store[name] 

905 except (KeyError, ValueError): 

906 pass 

907 try: 

908 return self.object_store[self.refs[name]] 

909 except RefFormatError as exc: 

910 raise KeyError(name) from exc 

911 

912 def __contains__(self, name: bytes) -> bool: 

913 """Check if a specific Git object or ref is present. 

914 

915 Args: 

916 name: Git object SHA1 or ref name 

917 """ 

918 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

919 return name in self.object_store or name in self.refs 

920 else: 

921 return name in self.refs 

922 

923 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

924 """Set a ref. 

925 

926 Args: 

927 name: ref name 

928 value: Ref value - either a ShaFile object, or a hex sha 

929 """ 

930 if name.startswith(b"refs/") or name == b"HEAD": 

931 if isinstance(value, ShaFile): 

932 self.refs[name] = value.id 

933 elif isinstance(value, bytes): 

934 self.refs[name] = value 

935 else: 

936 raise TypeError(value) 

937 else: 

938 raise ValueError(name) 

939 

940 def __delitem__(self, name: bytes) -> None: 

941 """Remove a ref. 

942 

943 Args: 

944 name: Name of the ref to remove 

945 """ 

946 if name.startswith(b"refs/") or name == b"HEAD": 

947 del self.refs[name] 

948 else: 

949 raise ValueError(name) 

950 

951 def _get_user_identity( 

952 self, config: "StackedConfig", kind: Optional[str] = None 

953 ) -> bytes: 

954 """Determine the identity to use for new commits.""" 

955 warnings.warn( 

956 "use get_user_identity() rather than Repo._get_user_identity", 

957 DeprecationWarning, 

958 ) 

959 return get_user_identity(config) 

960 

961 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None: 

962 """Add or modify graftpoints. 

963 

964 Args: 

965 updated_graftpoints: Dict of commit shas to list of parent shas 

966 """ 

967 # Simple validation 

968 for commit, parents in updated_graftpoints.items(): 

969 for sha in [commit, *parents]: 

970 check_hexsha(sha, "Invalid graftpoint") 

971 

972 self._graftpoints.update(updated_graftpoints) 

973 

974 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None: 

975 """Remove graftpoints. 

976 

977 Args: 

978 to_remove: List of commit shas 

979 """ 

980 for sha in to_remove: 

981 del self._graftpoints[sha] 

982 

983 def _read_heads(self, name): 

984 f = self.get_named_file(name) 

985 if f is None: 

986 return [] 

987 with f: 

988 return [line.strip() for line in f.readlines() if line.strip()] 

989 

990 def do_commit( 

991 self, 

992 message: Optional[bytes] = None, 

993 committer: Optional[bytes] = None, 

994 author: Optional[bytes] = None, 

995 commit_timestamp=None, 

996 commit_timezone=None, 

997 author_timestamp=None, 

998 author_timezone=None, 

999 tree: Optional[ObjectID] = None, 

1000 encoding: Optional[bytes] = None, 

1001 ref: Optional[Ref] = b"HEAD", 

1002 merge_heads: Optional[list[ObjectID]] = None, 

1003 no_verify: bool = False, 

1004 sign: bool = False, 

1005 ): 

1006 """Create a new commit. 

1007 

1008 If not specified, committer and author default to 

1009 get_user_identity(..., 'COMMITTER') 

1010 and get_user_identity(..., 'AUTHOR') respectively. 

1011 

1012 Args: 

1013 message: Commit message (bytes or callable that takes (repo, commit) 

1014 and returns bytes) 

1015 committer: Committer fullname 

1016 author: Author fullname 

1017 commit_timestamp: Commit timestamp (defaults to now) 

1018 commit_timezone: Commit timestamp timezone (defaults to GMT) 

1019 author_timestamp: Author timestamp (defaults to commit 

1020 timestamp) 

1021 author_timezone: Author timestamp timezone 

1022 (defaults to commit timestamp timezone) 

1023 tree: SHA1 of the tree root to use (if not specified the 

1024 current index will be committed). 

1025 encoding: Encoding 

1026 ref: Optional ref to commit to (defaults to current branch). 

1027 If None, creates a dangling commit without updating any ref. 

1028 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

1029 no_verify: Skip pre-commit and commit-msg hooks 

1030 sign: GPG Sign the commit (bool, defaults to False, 

1031 pass True to use default GPG key, 

1032 pass a str containing Key ID to use a specific GPG key) 

1033 

1034 Returns: 

1035 New commit SHA1 

1036 """ 

1037 try: 

1038 if not no_verify: 

1039 self.hooks["pre-commit"].execute() 

1040 except HookError as exc: 

1041 raise CommitError(exc) from exc 

1042 except KeyError: # no hook defined, silent fallthrough 

1043 pass 

1044 

1045 c = Commit() 

1046 if tree is None: 

1047 index = self.open_index() 

1048 c.tree = index.commit(self.object_store) 

1049 else: 

1050 if len(tree) != 40: 

1051 raise ValueError("tree must be a 40-byte hex sha string") 

1052 c.tree = tree 

1053 

1054 config = self.get_config_stack() 

1055 if merge_heads is None: 

1056 merge_heads = self._read_heads("MERGE_HEAD") 

1057 if committer is None: 

1058 committer = get_user_identity(config, kind="COMMITTER") 

1059 check_user_identity(committer) 

1060 c.committer = committer 

1061 if commit_timestamp is None: 

1062 # FIXME: Support GIT_COMMITTER_DATE environment variable 

1063 commit_timestamp = time.time() 

1064 c.commit_time = int(commit_timestamp) 

1065 if commit_timezone is None: 

1066 # FIXME: Use current user timezone rather than UTC 

1067 commit_timezone = 0 

1068 c.commit_timezone = commit_timezone 

1069 if author is None: 

1070 author = get_user_identity(config, kind="AUTHOR") 

1071 c.author = author 

1072 check_user_identity(author) 

1073 if author_timestamp is None: 

1074 # FIXME: Support GIT_AUTHOR_DATE environment variable 

1075 author_timestamp = commit_timestamp 

1076 c.author_time = int(author_timestamp) 

1077 if author_timezone is None: 

1078 author_timezone = commit_timezone 

1079 c.author_timezone = author_timezone 

1080 if encoding is None: 

1081 try: 

1082 encoding = config.get(("i18n",), "commitEncoding") 

1083 except KeyError: 

1084 pass # No dice 

1085 if encoding is not None: 

1086 c.encoding = encoding 

1087 # Store original message (might be callable) 

1088 original_message = message 

1089 message = None # Will be set later after parents are set 

1090 

1091 # Check if we should sign the commit 

1092 should_sign = sign 

1093 if sign is None: 

1094 # Check commit.gpgSign configuration when sign is not explicitly set 

1095 config = self.get_config_stack() 

1096 try: 

1097 should_sign = config.get_boolean((b"commit",), b"gpgSign") 

1098 except KeyError: 

1099 should_sign = False # Default to not signing if no config 

1100 keyid = sign if isinstance(sign, str) else None 

1101 

1102 if ref is None: 

1103 # Create a dangling commit 

1104 c.parents = merge_heads 

1105 else: 

1106 try: 

1107 old_head = self.refs[ref] 

1108 c.parents = [old_head, *merge_heads] 

1109 except KeyError: 

1110 c.parents = merge_heads 

1111 

1112 # Handle message after parents are set 

1113 if callable(original_message): 

1114 message = original_message(self, c) 

1115 if message is None: 

1116 raise ValueError("Message callback returned None") 

1117 else: 

1118 message = original_message 

1119 

1120 if message is None: 

1121 # FIXME: Try to read commit message from .git/MERGE_MSG 

1122 raise ValueError("No commit message specified") 

1123 

1124 try: 

1125 if no_verify: 

1126 c.message = message 

1127 else: 

1128 c.message = self.hooks["commit-msg"].execute(message) 

1129 if c.message is None: 

1130 c.message = message 

1131 except HookError as exc: 

1132 raise CommitError(exc) from exc 

1133 except KeyError: # no hook defined, message not modified 

1134 c.message = message 

1135 

1136 if ref is None: 

1137 # Create a dangling commit 

1138 if should_sign: 

1139 c.sign(keyid) 

1140 self.object_store.add_object(c) 

1141 else: 

1142 try: 

1143 old_head = self.refs[ref] 

1144 if should_sign: 

1145 c.sign(keyid) 

1146 self.object_store.add_object(c) 

1147 ok = self.refs.set_if_equals( 

1148 ref, 

1149 old_head, 

1150 c.id, 

1151 message=b"commit: " + message, 

1152 committer=committer, 

1153 timestamp=commit_timestamp, 

1154 timezone=commit_timezone, 

1155 ) 

1156 except KeyError: 

1157 c.parents = merge_heads 

1158 if should_sign: 

1159 c.sign(keyid) 

1160 self.object_store.add_object(c) 

1161 ok = self.refs.add_if_new( 

1162 ref, 

1163 c.id, 

1164 message=b"commit: " + message, 

1165 committer=committer, 

1166 timestamp=commit_timestamp, 

1167 timezone=commit_timezone, 

1168 ) 

1169 if not ok: 

1170 # Fail if the atomic compare-and-swap failed, leaving the 

1171 # commit and all its objects as garbage. 

1172 raise CommitError(f"{ref!r} changed during commit") 

1173 

1174 self._del_named_file("MERGE_HEAD") 

1175 

1176 try: 

1177 self.hooks["post-commit"].execute() 

1178 except HookError as e: # silent failure 

1179 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

1180 except KeyError: # no hook defined, silent fallthrough 

1181 pass 

1182 

1183 # Trigger auto GC if needed 

1184 from .gc import maybe_auto_gc 

1185 

1186 maybe_auto_gc(self) 

1187 

1188 return c.id 

1189 

1190 

1191def read_gitfile(f): 

1192 """Read a ``.git`` file. 

1193 

1194 The first line of the file should start with "gitdir: " 

1195 

1196 Args: 

1197 f: File-like object to read from 

1198 Returns: A path 

1199 """ 

1200 cs = f.read() 

1201 if not cs.startswith("gitdir: "): 

1202 raise ValueError("Expected file to start with 'gitdir: '") 

1203 return cs[len("gitdir: ") :].rstrip("\n") 

1204 

1205 

1206class UnsupportedVersion(Exception): 

1207 """Unsupported repository version.""" 

1208 

1209 def __init__(self, version) -> None: 

1210 self.version = version 

1211 

1212 

1213class UnsupportedExtension(Exception): 

1214 """Unsupported repository extension.""" 

1215 

1216 def __init__(self, extension) -> None: 

1217 self.extension = extension 

1218 

1219 

1220class Repo(BaseRepo): 

1221 """A git repository backed by local disk. 

1222 

1223 To open an existing repository, call the constructor with 

1224 the path of the repository. 

1225 

1226 To create a new repository, use the Repo.init class method. 

1227 

1228 Note that a repository object may hold on to resources such 

1229 as file handles for performance reasons; call .close() to free 

1230 up those resources. 

1231 

1232 Attributes: 

1233 path: Path to the working copy (if it exists) or repository control 

1234 directory (if the repository is bare) 

1235 bare: Whether this is a bare repository 

1236 """ 

1237 

1238 path: str 

1239 bare: bool 

1240 

1241 def __init__( 

1242 self, 

1243 root: Union[str, bytes, os.PathLike], 

1244 object_store: Optional[PackBasedObjectStore] = None, 

1245 bare: Optional[bool] = None, 

1246 ) -> None: 

1247 """Open a repository on disk. 

1248 

1249 Args: 

1250 root: Path to the repository's root. 

1251 object_store: ObjectStore to use; if omitted, we use the 

1252 repository's default object store 

1253 bare: True if this is a bare repository. 

1254 """ 

1255 root = os.fspath(root) 

1256 if isinstance(root, bytes): 

1257 root = os.fsdecode(root) 

1258 hidden_path = os.path.join(root, CONTROLDIR) 

1259 if bare is None: 

1260 if os.path.isfile(hidden_path) or os.path.isdir( 

1261 os.path.join(hidden_path, OBJECTDIR) 

1262 ): 

1263 bare = False 

1264 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1265 os.path.join(root, REFSDIR) 

1266 ): 

1267 bare = True 

1268 else: 

1269 raise NotGitRepository( 

1270 "No git repository was found at {path}".format(**dict(path=root)) 

1271 ) 

1272 

1273 self.bare = bare 

1274 if bare is False: 

1275 if os.path.isfile(hidden_path): 

1276 with open(hidden_path) as f: 

1277 path = read_gitfile(f) 

1278 self._controldir = os.path.join(root, path) 

1279 else: 

1280 self._controldir = hidden_path 

1281 else: 

1282 self._controldir = root 

1283 commondir = self.get_named_file(COMMONDIR) 

1284 if commondir is not None: 

1285 with commondir: 

1286 self._commondir = os.path.join( 

1287 self.controldir(), 

1288 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1289 ) 

1290 else: 

1291 self._commondir = self._controldir 

1292 self.path = root 

1293 

1294 # Initialize refs early so they're available for config condition matchers 

1295 self.refs = DiskRefsContainer( 

1296 self.commondir(), self._controldir, logger=self._write_reflog 

1297 ) 

1298 

1299 config = self.get_config() 

1300 try: 

1301 repository_format_version = config.get("core", "repositoryformatversion") 

1302 format_version = ( 

1303 0 

1304 if repository_format_version is None 

1305 else int(repository_format_version) 

1306 ) 

1307 except KeyError: 

1308 format_version = 0 

1309 

1310 if format_version not in (0, 1): 

1311 raise UnsupportedVersion(format_version) 

1312 

1313 # Track extensions we encounter 

1314 has_reftable_extension = False 

1315 for extension, value in config.items((b"extensions",)): 

1316 if extension.lower() == b"refstorage": 

1317 if value == b"reftable": 

1318 has_reftable_extension = True 

1319 else: 

1320 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1321 elif extension.lower() not in (b"worktreeconfig",): 

1322 raise UnsupportedExtension(extension) 

1323 

1324 if object_store is None: 

1325 object_store = DiskObjectStore.from_config( 

1326 os.path.join(self.commondir(), OBJECTDIR), config 

1327 ) 

1328 

1329 # Use reftable if extension is configured 

1330 if has_reftable_extension: 

1331 from .reftable import ReftableRefsContainer 

1332 

1333 self.refs = ReftableRefsContainer(self.commondir()) 

1334 BaseRepo.__init__(self, object_store, self.refs) 

1335 

1336 self._graftpoints = {} 

1337 graft_file = self.get_named_file( 

1338 os.path.join("info", "grafts"), basedir=self.commondir() 

1339 ) 

1340 if graft_file: 

1341 with graft_file: 

1342 self._graftpoints.update(parse_graftpoints(graft_file)) 

1343 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1344 if graft_file: 

1345 with graft_file: 

1346 self._graftpoints.update(parse_graftpoints(graft_file)) 

1347 

1348 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1349 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1350 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1351 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1352 

1353 def _write_reflog( 

1354 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1355 ) -> None: 

1356 from .reflog import format_reflog_line 

1357 

1358 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1359 try: 

1360 os.makedirs(os.path.dirname(path)) 

1361 except FileExistsError: 

1362 pass 

1363 if committer is None: 

1364 config = self.get_config_stack() 

1365 committer = get_user_identity(config) 

1366 check_user_identity(committer) 

1367 if timestamp is None: 

1368 timestamp = int(time.time()) 

1369 if timezone is None: 

1370 timezone = 0 # FIXME 

1371 with open(path, "ab") as f: 

1372 f.write( 

1373 format_reflog_line( 

1374 old_sha, new_sha, committer, timestamp, timezone, message 

1375 ) 

1376 + b"\n" 

1377 ) 

1378 

1379 def read_reflog(self, ref): 

1380 """Read reflog entries for a reference. 

1381 

1382 Args: 

1383 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1384 

1385 Yields: 

1386 reflog.Entry objects in chronological order (oldest first) 

1387 """ 

1388 from .reflog import read_reflog 

1389 

1390 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1391 try: 

1392 with open(path, "rb") as f: 

1393 yield from read_reflog(f) 

1394 except FileNotFoundError: 

1395 return 

1396 

1397 @classmethod 

1398 def discover(cls, start="."): 

1399 """Iterate parent directories to discover a repository. 

1400 

1401 Return a Repo object for the first parent directory that looks like a 

1402 Git repository. 

1403 

1404 Args: 

1405 start: The directory to start discovery from (defaults to '.') 

1406 """ 

1407 remaining = True 

1408 path = os.path.abspath(start) 

1409 while remaining: 

1410 try: 

1411 return cls(path) 

1412 except NotGitRepository: 

1413 path, remaining = os.path.split(path) 

1414 raise NotGitRepository( 

1415 "No git repository was found at {path}".format(**dict(path=start)) 

1416 ) 

1417 

1418 def controldir(self): 

1419 """Return the path of the control directory.""" 

1420 return self._controldir 

1421 

1422 def commondir(self): 

1423 """Return the path of the common directory. 

1424 

1425 For a main working tree, it is identical to controldir(). 

1426 

1427 For a linked working tree, it is the control directory of the 

1428 main working tree. 

1429 """ 

1430 return self._commondir 

1431 

1432 def _determine_file_mode(self): 

1433 """Probe the file-system to determine whether permissions can be trusted. 

1434 

1435 Returns: True if permissions can be trusted, False otherwise. 

1436 """ 

1437 fname = os.path.join(self.path, ".probe-permissions") 

1438 with open(fname, "w") as f: 

1439 f.write("") 

1440 

1441 st1 = os.lstat(fname) 

1442 try: 

1443 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1444 except PermissionError: 

1445 return False 

1446 st2 = os.lstat(fname) 

1447 

1448 os.unlink(fname) 

1449 

1450 mode_differs = st1.st_mode != st2.st_mode 

1451 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1452 

1453 return mode_differs and st2_has_exec 

1454 

1455 def _determine_symlinks(self): 

1456 """Probe the filesystem to determine whether symlinks can be created. 

1457 

1458 Returns: True if symlinks can be created, False otherwise. 

1459 """ 

1460 # TODO(jelmer): Actually probe disk / look at filesystem 

1461 return sys.platform != "win32" 

1462 

1463 def _put_named_file(self, path, contents) -> None: 

1464 """Write a file to the control dir with the given name and contents. 

1465 

1466 Args: 

1467 path: The path to the file, relative to the control dir. 

1468 contents: A string to write to the file. 

1469 """ 

1470 path = path.lstrip(os.path.sep) 

1471 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1472 f.write(contents) 

1473 

1474 def _del_named_file(self, path) -> None: 

1475 try: 

1476 os.unlink(os.path.join(self.controldir(), path)) 

1477 except FileNotFoundError: 

1478 return 

1479 

1480 def get_named_file(self, path, basedir=None): 

1481 """Get a file from the control dir with a specific name. 

1482 

1483 Although the filename should be interpreted as a filename relative to 

1484 the control dir in a disk-based Repo, the object returned need not be 

1485 pointing to a file in that location. 

1486 

1487 Args: 

1488 path: The path to the file, relative to the control dir. 

1489 basedir: Optional argument that specifies an alternative to the 

1490 control dir. 

1491 Returns: An open file object, or None if the file does not exist. 

1492 """ 

1493 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1494 # the dumb web serving code. 

1495 if basedir is None: 

1496 basedir = self.controldir() 

1497 path = path.lstrip(os.path.sep) 

1498 try: 

1499 return open(os.path.join(basedir, path), "rb") 

1500 except FileNotFoundError: 

1501 return None 

1502 

1503 def index_path(self): 

1504 """Return path to the index file.""" 

1505 return os.path.join(self.controldir(), INDEX_FILENAME) 

1506 

1507 def open_index(self) -> "Index": 

1508 """Open the index for this repository. 

1509 

1510 Raises: 

1511 NoIndexPresent: If no index is present 

1512 Returns: The matching `Index` 

1513 """ 

1514 from .index import Index 

1515 

1516 if not self.has_index(): 

1517 raise NoIndexPresent 

1518 

1519 # Check for manyFiles feature configuration 

1520 config = self.get_config_stack() 

1521 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1522 skip_hash = False 

1523 index_version = None 

1524 

1525 if many_files: 

1526 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1527 try: 

1528 index_version_str = config.get(b"index", b"version") 

1529 index_version = int(index_version_str) 

1530 except KeyError: 

1531 index_version = 4 # Default to version 4 for manyFiles 

1532 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1533 else: 

1534 # Check for explicit index settings 

1535 try: 

1536 index_version_str = config.get(b"index", b"version") 

1537 index_version = int(index_version_str) 

1538 except KeyError: 

1539 index_version = None 

1540 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1541 

1542 return Index(self.index_path(), skip_hash=skip_hash, version=index_version) 

1543 

1544 def has_index(self) -> bool: 

1545 """Check if an index is present.""" 

1546 # Bare repos must never have index files; non-bare repos may have a 

1547 # missing index file, which is treated as empty. 

1548 return not self.bare 

1549 

1550 def stage( 

1551 self, 

1552 fs_paths: Union[ 

1553 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1554 ], 

1555 ) -> None: 

1556 """Stage a set of paths. 

1557 

1558 Args: 

1559 fs_paths: List of paths, relative to the repository path 

1560 """ 

1561 root_path_bytes = os.fsencode(self.path) 

1562 

1563 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

1564 fs_paths = [fs_paths] 

1565 fs_paths = list(fs_paths) 

1566 

1567 from .index import ( 

1568 _fs_to_tree_path, 

1569 blob_from_path_and_stat, 

1570 index_entry_from_directory, 

1571 index_entry_from_stat, 

1572 ) 

1573 

1574 index = self.open_index() 

1575 blob_normalizer = self.get_blob_normalizer() 

1576 for fs_path in fs_paths: 

1577 if not isinstance(fs_path, bytes): 

1578 fs_path = os.fsencode(fs_path) 

1579 if os.path.isabs(fs_path): 

1580 raise ValueError( 

1581 f"path {fs_path!r} should be relative to " 

1582 "repository root, not absolute" 

1583 ) 

1584 tree_path = _fs_to_tree_path(fs_path) 

1585 full_path = os.path.join(root_path_bytes, fs_path) 

1586 try: 

1587 st = os.lstat(full_path) 

1588 except OSError: 

1589 # File no longer exists 

1590 try: 

1591 del index[tree_path] 

1592 except KeyError: 

1593 pass # already removed 

1594 else: 

1595 if stat.S_ISDIR(st.st_mode): 

1596 entry = index_entry_from_directory(st, full_path) 

1597 if entry: 

1598 index[tree_path] = entry 

1599 else: 

1600 try: 

1601 del index[tree_path] 

1602 except KeyError: 

1603 pass 

1604 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

1605 try: 

1606 del index[tree_path] 

1607 except KeyError: 

1608 pass 

1609 else: 

1610 blob = blob_from_path_and_stat(full_path, st) 

1611 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

1612 self.object_store.add_object(blob) 

1613 index[tree_path] = index_entry_from_stat(st, blob.id) 

1614 index.write() 

1615 

1616 def unstage(self, fs_paths: list[str]) -> None: 

1617 """Unstage specific file in the index 

1618 Args: 

1619 fs_paths: a list of files to unstage, 

1620 relative to the repository path. 

1621 """ 

1622 from .index import IndexEntry, _fs_to_tree_path 

1623 

1624 index = self.open_index() 

1625 try: 

1626 tree_id = self[b"HEAD"].tree 

1627 except KeyError: 

1628 # no head mean no commit in the repo 

1629 for fs_path in fs_paths: 

1630 tree_path = _fs_to_tree_path(fs_path) 

1631 del index[tree_path] 

1632 index.write() 

1633 return 

1634 

1635 for fs_path in fs_paths: 

1636 tree_path = _fs_to_tree_path(fs_path) 

1637 try: 

1638 tree = self.object_store[tree_id] 

1639 assert isinstance(tree, Tree) 

1640 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path) 

1641 except KeyError: 

1642 # if tree_entry didn't exist, this file was being added, so 

1643 # remove index entry 

1644 try: 

1645 del index[tree_path] 

1646 continue 

1647 except KeyError as exc: 

1648 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

1649 

1650 st = None 

1651 try: 

1652 st = os.lstat(os.path.join(self.path, fs_path)) 

1653 except FileNotFoundError: 

1654 pass 

1655 

1656 index_entry = IndexEntry( 

1657 ctime=(self[b"HEAD"].commit_time, 0), 

1658 mtime=(self[b"HEAD"].commit_time, 0), 

1659 dev=st.st_dev if st else 0, 

1660 ino=st.st_ino if st else 0, 

1661 mode=tree_entry[0], 

1662 uid=st.st_uid if st else 0, 

1663 gid=st.st_gid if st else 0, 

1664 size=len(self[tree_entry[1]].data), 

1665 sha=tree_entry[1], 

1666 flags=0, 

1667 extended_flags=0, 

1668 ) 

1669 

1670 index[tree_path] = index_entry 

1671 index.write() 

1672 

1673 def clone( 

1674 self, 

1675 target_path, 

1676 *, 

1677 mkdir=True, 

1678 bare=False, 

1679 origin=b"origin", 

1680 checkout=None, 

1681 branch=None, 

1682 progress=None, 

1683 depth: Optional[int] = None, 

1684 symlinks=None, 

1685 ) -> "Repo": 

1686 """Clone this repository. 

1687 

1688 Args: 

1689 target_path: Target path 

1690 mkdir: Create the target directory 

1691 bare: Whether to create a bare repository 

1692 checkout: Whether or not to check-out HEAD after cloning 

1693 origin: Base name for refs in target repository 

1694 cloned from this repository 

1695 branch: Optional branch or tag to be used as HEAD in the new repository 

1696 instead of this repository's HEAD. 

1697 progress: Optional progress function 

1698 depth: Depth at which to fetch 

1699 symlinks: Symlinks setting (default to autodetect) 

1700 Returns: Created repository as `Repo` 

1701 """ 

1702 encoded_path = os.fsencode(self.path) 

1703 

1704 if mkdir: 

1705 os.mkdir(target_path) 

1706 

1707 try: 

1708 if not bare: 

1709 target = Repo.init(target_path, symlinks=symlinks) 

1710 if checkout is None: 

1711 checkout = True 

1712 else: 

1713 if checkout: 

1714 raise ValueError("checkout and bare are incompatible") 

1715 target = Repo.init_bare(target_path) 

1716 

1717 try: 

1718 target_config = target.get_config() 

1719 target_config.set((b"remote", origin), b"url", encoded_path) 

1720 target_config.set( 

1721 (b"remote", origin), 

1722 b"fetch", 

1723 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1724 ) 

1725 target_config.write_to_path() 

1726 

1727 ref_message = b"clone: from " + encoded_path 

1728 self.fetch(target, depth=depth) 

1729 target.refs.import_refs( 

1730 b"refs/remotes/" + origin, 

1731 self.refs.as_dict(b"refs/heads"), 

1732 message=ref_message, 

1733 ) 

1734 target.refs.import_refs( 

1735 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1736 ) 

1737 

1738 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1739 origin_head = head_chain[-1] if head_chain else None 

1740 if origin_sha and not origin_head: 

1741 # set detached HEAD 

1742 target.refs[b"HEAD"] = origin_sha 

1743 else: 

1744 _set_origin_head(target.refs, origin, origin_head) 

1745 head_ref = _set_default_branch( 

1746 target.refs, origin, origin_head, branch, ref_message 

1747 ) 

1748 

1749 # Update target head 

1750 if head_ref: 

1751 head = _set_head(target.refs, head_ref, ref_message) 

1752 else: 

1753 head = None 

1754 

1755 if checkout and head is not None: 

1756 target.reset_index() 

1757 except BaseException: 

1758 target.close() 

1759 raise 

1760 except BaseException: 

1761 if mkdir: 

1762 import shutil 

1763 

1764 shutil.rmtree(target_path) 

1765 raise 

1766 return target 

1767 

1768 def reset_index(self, tree: Optional[bytes] = None): 

1769 """Reset the index back to a specific tree. 

1770 

1771 Args: 

1772 tree: Tree SHA to reset to, None for current HEAD tree. 

1773 """ 

1774 from .index import ( 

1775 build_index_from_tree, 

1776 symlink, 

1777 validate_path_element_default, 

1778 validate_path_element_hfs, 

1779 validate_path_element_ntfs, 

1780 ) 

1781 

1782 if tree is None: 

1783 head = self[b"HEAD"] 

1784 if isinstance(head, Tag): 

1785 _cls, obj = head.object 

1786 head = self.get_object(obj) 

1787 tree = head.tree 

1788 config = self.get_config() 

1789 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

1790 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

1791 validate_path_element = validate_path_element_ntfs 

1792 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"): 

1793 validate_path_element = validate_path_element_hfs 

1794 else: 

1795 validate_path_element = validate_path_element_default 

1796 if config.get_boolean(b"core", b"symlinks", True): 

1797 symlink_fn = symlink 

1798 else: 

1799 

1800 def symlink_fn(source, target) -> None: # type: ignore 

1801 with open( 

1802 target, "w" + ("b" if isinstance(source, bytes) else "") 

1803 ) as f: 

1804 f.write(source) 

1805 

1806 blob_normalizer = self.get_blob_normalizer() 

1807 return build_index_from_tree( 

1808 self.path, 

1809 self.index_path(), 

1810 self.object_store, 

1811 tree, 

1812 honor_filemode=honor_filemode, 

1813 validate_path_element=validate_path_element, 

1814 symlink_fn=symlink_fn, 

1815 blob_normalizer=blob_normalizer, 

1816 ) 

1817 

1818 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1819 """Get condition matchers for includeIf conditions. 

1820 

1821 Returns a dict of condition prefix to matcher function. 

1822 """ 

1823 from pathlib import Path 

1824 

1825 from .config import ConditionMatcher, match_glob_pattern 

1826 

1827 # Add gitdir matchers 

1828 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1829 # Handle relative patterns (starting with ./) 

1830 if pattern.startswith("./"): 

1831 # Can't handle relative patterns without config directory context 

1832 return False 

1833 

1834 # Normalize repository path 

1835 try: 

1836 repo_path = str(Path(self._controldir).resolve()) 

1837 except (OSError, ValueError): 

1838 return False 

1839 

1840 # Expand ~ in pattern and normalize 

1841 pattern = os.path.expanduser(pattern) 

1842 

1843 # Normalize pattern following Git's rules 

1844 pattern = pattern.replace("\\", "/") 

1845 if not pattern.startswith(("~/", "./", "/", "**")): 

1846 # Check for Windows absolute path 

1847 if len(pattern) >= 2 and pattern[1] == ":": 

1848 pass 

1849 else: 

1850 pattern = "**/" + pattern 

1851 if pattern.endswith("/"): 

1852 pattern = pattern + "**" 

1853 

1854 # Use the existing _match_gitdir_pattern function 

1855 from .config import _match_gitdir_pattern 

1856 

1857 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1858 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1859 

1860 return _match_gitdir_pattern( 

1861 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1862 ) 

1863 

1864 # Add onbranch matcher 

1865 def match_onbranch(pattern: str) -> bool: 

1866 try: 

1867 # Get the current branch using refs 

1868 ref_chain, _ = self.refs.follow(b"HEAD") 

1869 head_ref = ref_chain[-1] # Get the final resolved ref 

1870 except KeyError: 

1871 pass 

1872 else: 

1873 if head_ref and head_ref.startswith(b"refs/heads/"): 

1874 # Extract branch name from ref 

1875 branch = head_ref[11:].decode("utf-8", errors="replace") 

1876 return match_glob_pattern(branch, pattern) 

1877 return False 

1878 

1879 matchers: dict[str, ConditionMatcher] = { 

1880 "onbranch:": match_onbranch, 

1881 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

1882 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

1883 } 

1884 

1885 return matchers 

1886 

1887 def get_worktree_config(self) -> "ConfigFile": 

1888 from .config import ConfigFile 

1889 

1890 path = os.path.join(self.commondir(), "config.worktree") 

1891 try: 

1892 # Pass condition matchers for includeIf evaluation 

1893 condition_matchers = self._get_config_condition_matchers() 

1894 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1895 except FileNotFoundError: 

1896 cf = ConfigFile() 

1897 cf.path = path 

1898 return cf 

1899 

1900 def get_config(self) -> "ConfigFile": 

1901 """Retrieve the config object. 

1902 

1903 Returns: `ConfigFile` object for the ``.git/config`` file. 

1904 """ 

1905 from .config import ConfigFile 

1906 

1907 path = os.path.join(self._commondir, "config") 

1908 try: 

1909 # Pass condition matchers for includeIf evaluation 

1910 condition_matchers = self._get_config_condition_matchers() 

1911 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

1912 except FileNotFoundError: 

1913 ret = ConfigFile() 

1914 ret.path = path 

1915 return ret 

1916 

1917 def get_rebase_state_manager(self): 

1918 """Get the appropriate rebase state manager for this repository. 

1919 

1920 Returns: DiskRebaseStateManager instance 

1921 """ 

1922 import os 

1923 

1924 from .rebase import DiskRebaseStateManager 

1925 

1926 path = os.path.join(self.controldir(), "rebase-merge") 

1927 return DiskRebaseStateManager(path) 

1928 

1929 def get_description(self): 

1930 """Retrieve the description of this repository. 

1931 

1932 Returns: A string describing the repository or None. 

1933 """ 

1934 path = os.path.join(self._controldir, "description") 

1935 try: 

1936 with GitFile(path, "rb") as f: 

1937 return f.read() 

1938 except FileNotFoundError: 

1939 return None 

1940 

1941 def __repr__(self) -> str: 

1942 return f"<Repo at {self.path!r}>" 

1943 

1944 def set_description(self, description) -> None: 

1945 """Set the description for this repository. 

1946 

1947 Args: 

1948 description: Text to set as description for this repository. 

1949 """ 

1950 self._put_named_file("description", description) 

1951 

1952 @classmethod 

1953 def _init_maybe_bare( 

1954 cls, 

1955 path: Union[str, bytes, os.PathLike], 

1956 controldir: Union[str, bytes, os.PathLike], 

1957 bare, 

1958 object_store=None, 

1959 config=None, 

1960 default_branch=None, 

1961 symlinks: Optional[bool] = None, 

1962 format: Optional[int] = None, 

1963 ): 

1964 path = os.fspath(path) 

1965 if isinstance(path, bytes): 

1966 path = os.fsdecode(path) 

1967 controldir = os.fspath(controldir) 

1968 if isinstance(controldir, bytes): 

1969 controldir = os.fsdecode(controldir) 

1970 for d in BASE_DIRECTORIES: 

1971 os.mkdir(os.path.join(controldir, *d)) 

1972 if object_store is None: 

1973 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1974 ret = cls(path, bare=bare, object_store=object_store) 

1975 if default_branch is None: 

1976 if config is None: 

1977 from .config import StackedConfig 

1978 

1979 config = StackedConfig.default() 

1980 try: 

1981 default_branch = config.get("init", "defaultBranch") 

1982 except KeyError: 

1983 default_branch = DEFAULT_BRANCH 

1984 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1985 ret._init_files(bare=bare, symlinks=symlinks, format=format) 

1986 return ret 

1987 

1988 @classmethod 

1989 def init( 

1990 cls, 

1991 path: Union[str, bytes, os.PathLike], 

1992 *, 

1993 mkdir: bool = False, 

1994 config=None, 

1995 default_branch=None, 

1996 symlinks: Optional[bool] = None, 

1997 format: Optional[int] = None, 

1998 ) -> "Repo": 

1999 """Create a new repository. 

2000 

2001 Args: 

2002 path: Path in which to create the repository 

2003 mkdir: Whether to create the directory 

2004 format: Repository format version (defaults to 0) 

2005 Returns: `Repo` instance 

2006 """ 

2007 path = os.fspath(path) 

2008 if isinstance(path, bytes): 

2009 path = os.fsdecode(path) 

2010 if mkdir: 

2011 os.mkdir(path) 

2012 controldir = os.path.join(path, CONTROLDIR) 

2013 os.mkdir(controldir) 

2014 _set_filesystem_hidden(controldir) 

2015 return cls._init_maybe_bare( 

2016 path, 

2017 controldir, 

2018 False, 

2019 config=config, 

2020 default_branch=default_branch, 

2021 symlinks=symlinks, 

2022 format=format, 

2023 ) 

2024 

2025 @classmethod 

2026 def _init_new_working_directory( 

2027 cls, 

2028 path: Union[str, bytes, os.PathLike], 

2029 main_repo, 

2030 identifier=None, 

2031 mkdir=False, 

2032 ): 

2033 """Create a new working directory linked to a repository. 

2034 

2035 Args: 

2036 path: Path in which to create the working tree. 

2037 main_repo: Main repository to reference 

2038 identifier: Worktree identifier 

2039 mkdir: Whether to create the directory 

2040 Returns: `Repo` instance 

2041 """ 

2042 path = os.fspath(path) 

2043 if isinstance(path, bytes): 

2044 path = os.fsdecode(path) 

2045 if mkdir: 

2046 os.mkdir(path) 

2047 if identifier is None: 

2048 identifier = os.path.basename(path) 

2049 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) 

2050 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

2051 gitdirfile = os.path.join(path, CONTROLDIR) 

2052 with open(gitdirfile, "wb") as f: 

2053 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

2054 try: 

2055 os.mkdir(main_worktreesdir) 

2056 except FileExistsError: 

2057 pass 

2058 try: 

2059 os.mkdir(worktree_controldir) 

2060 except FileExistsError: 

2061 pass 

2062 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

2063 f.write(os.fsencode(gitdirfile) + b"\n") 

2064 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

2065 f.write(b"../..\n") 

2066 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

2067 f.write(main_repo.head() + b"\n") 

2068 r = cls(path) 

2069 r.reset_index() 

2070 return r 

2071 

2072 @classmethod 

2073 def init_bare( 

2074 cls, 

2075 path: Union[str, bytes, os.PathLike], 

2076 *, 

2077 mkdir=False, 

2078 object_store=None, 

2079 config=None, 

2080 default_branch=None, 

2081 format: Optional[int] = None, 

2082 ): 

2083 """Create a new bare repository. 

2084 

2085 ``path`` should already exist and be an empty directory. 

2086 

2087 Args: 

2088 path: Path to create bare repository in 

2089 format: Repository format version (defaults to 0) 

2090 Returns: a `Repo` instance 

2091 """ 

2092 path = os.fspath(path) 

2093 if isinstance(path, bytes): 

2094 path = os.fsdecode(path) 

2095 if mkdir: 

2096 os.mkdir(path) 

2097 return cls._init_maybe_bare( 

2098 path, 

2099 path, 

2100 True, 

2101 object_store=object_store, 

2102 config=config, 

2103 default_branch=default_branch, 

2104 format=format, 

2105 ) 

2106 

2107 create = init_bare 

2108 

2109 def close(self) -> None: 

2110 """Close any files opened by this repository.""" 

2111 self.object_store.close() 

2112 

2113 def __enter__(self): 

2114 return self 

2115 

2116 def __exit__(self, exc_type, exc_val, exc_tb): 

2117 self.close() 

2118 

2119 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2120 """Read .gitattributes file from working tree. 

2121 

2122 Returns: 

2123 Dictionary mapping file patterns to attributes 

2124 """ 

2125 gitattributes = {} 

2126 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2127 

2128 if os.path.exists(gitattributes_path): 

2129 with open(gitattributes_path, "rb") as f: 

2130 for line in f: 

2131 line = line.strip() 

2132 if not line or line.startswith(b"#"): 

2133 continue 

2134 

2135 parts = line.split() 

2136 if len(parts) < 2: 

2137 continue 

2138 

2139 pattern = parts[0] 

2140 attrs = {} 

2141 

2142 for attr in parts[1:]: 

2143 if attr.startswith(b"-"): 

2144 # Unset attribute 

2145 attrs[attr[1:]] = b"false" 

2146 elif b"=" in attr: 

2147 # Set to value 

2148 key, value = attr.split(b"=", 1) 

2149 attrs[key] = value 

2150 else: 

2151 # Set attribute 

2152 attrs[attr] = b"true" 

2153 

2154 gitattributes[pattern] = attrs 

2155 

2156 return gitattributes 

2157 

2158 def get_blob_normalizer(self): 

2159 """Return a BlobNormalizer object.""" 

2160 from .filters import FilterBlobNormalizer, FilterRegistry 

2161 

2162 # Get proper GitAttributes object 

2163 git_attributes = self.get_gitattributes() 

2164 config_stack = self.get_config_stack() 

2165 

2166 # Create FilterRegistry with repo reference 

2167 filter_registry = FilterRegistry(config_stack, self) 

2168 

2169 # Return FilterBlobNormalizer which handles all filters including line endings 

2170 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2171 

2172 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2173 """Read gitattributes for the repository. 

2174 

2175 Args: 

2176 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2177 

2178 Returns: 

2179 GitAttributes object that can be used to match paths 

2180 """ 

2181 from .attrs import ( 

2182 GitAttributes, 

2183 Pattern, 

2184 parse_git_attributes, 

2185 ) 

2186 

2187 patterns = [] 

2188 

2189 # Read system gitattributes (TODO: implement this) 

2190 # Read global gitattributes (TODO: implement this) 

2191 

2192 # Read repository .gitattributes from index/tree 

2193 if tree is None: 

2194 try: 

2195 # Try to get from HEAD 

2196 head = self[b"HEAD"] 

2197 if isinstance(head, Tag): 

2198 _cls, obj = head.object 

2199 head = self.get_object(obj) 

2200 tree = head.tree 

2201 except KeyError: 

2202 # No HEAD, no attributes from tree 

2203 pass 

2204 

2205 if tree is not None: 

2206 try: 

2207 tree_obj = self[tree] 

2208 if b".gitattributes" in tree_obj: 

2209 _, attrs_sha = tree_obj[b".gitattributes"] 

2210 attrs_blob = self[attrs_sha] 

2211 if isinstance(attrs_blob, Blob): 

2212 attrs_data = BytesIO(attrs_blob.data) 

2213 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2214 pattern = Pattern(pattern_bytes) 

2215 patterns.append((pattern, attrs)) 

2216 except (KeyError, NotTreeError): 

2217 pass 

2218 

2219 # Read .git/info/attributes 

2220 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2221 if os.path.exists(info_attrs_path): 

2222 with open(info_attrs_path, "rb") as f: 

2223 for pattern_bytes, attrs in parse_git_attributes(f): 

2224 pattern = Pattern(pattern_bytes) 

2225 patterns.append((pattern, attrs)) 

2226 

2227 # Read .gitattributes from working directory (if it exists) 

2228 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2229 if os.path.exists(working_attrs_path): 

2230 with open(working_attrs_path, "rb") as f: 

2231 for pattern_bytes, attrs in parse_git_attributes(f): 

2232 pattern = Pattern(pattern_bytes) 

2233 patterns.append((pattern, attrs)) 

2234 

2235 return GitAttributes(patterns) 

2236 

2237 def _sparse_checkout_file_path(self) -> str: 

2238 """Return the path of the sparse-checkout file in this repo's control dir.""" 

2239 return os.path.join(self.controldir(), "info", "sparse-checkout") 

2240 

2241 def configure_for_cone_mode(self) -> None: 

2242 """Ensure the repository is configured for cone-mode sparse-checkout.""" 

2243 config = self.get_config() 

2244 config.set((b"core",), b"sparseCheckout", b"true") 

2245 config.set((b"core",), b"sparseCheckoutCone", b"true") 

2246 config.write_to_path() 

2247 

2248 def infer_cone_mode(self) -> bool: 

2249 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False.""" 

2250 config = self.get_config() 

2251 try: 

2252 sc_cone = config.get((b"core",), b"sparseCheckoutCone") 

2253 return sc_cone == b"true" 

2254 except KeyError: 

2255 # If core.sparseCheckoutCone is not set, default to False 

2256 return False 

2257 

2258 def get_sparse_checkout_patterns(self) -> list[str]: 

2259 """Return a list of sparse-checkout patterns from info/sparse-checkout. 

2260 

2261 Returns: 

2262 A list of patterns. Returns an empty list if the file is missing. 

2263 """ 

2264 path = self._sparse_checkout_file_path() 

2265 try: 

2266 with open(path, encoding="utf-8") as f: 

2267 return [line.strip() for line in f if line.strip()] 

2268 except FileNotFoundError: 

2269 return [] 

2270 

2271 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None: 

2272 """Write the given sparse-checkout patterns into info/sparse-checkout. 

2273 

2274 Creates the info/ directory if it does not exist. 

2275 

2276 Args: 

2277 patterns: A list of gitignore-style patterns to store. 

2278 """ 

2279 info_dir = os.path.join(self.controldir(), "info") 

2280 os.makedirs(info_dir, exist_ok=True) 

2281 

2282 path = self._sparse_checkout_file_path() 

2283 with open(path, "w", encoding="utf-8") as f: 

2284 for pat in patterns: 

2285 f.write(pat + "\n") 

2286 

2287 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None: 

2288 """Write the given cone-mode directory patterns into info/sparse-checkout. 

2289 

2290 For each directory to include, add an inclusion line that "undoes" the prior 

2291 ``!/*/`` 'exclude' that re-includes that directory and everything under it. 

2292 Never add the same line twice. 

2293 """ 

2294 patterns = ["/*", "!/*/"] 

2295 if dirs: 

2296 for d in dirs: 

2297 d = d.strip("/") 

2298 line = f"/{d}/" 

2299 if d and line not in patterns: 

2300 patterns.append(line) 

2301 self.set_sparse_checkout_patterns(patterns) 

2302 

2303 

2304class MemoryRepo(BaseRepo): 

2305 """Repo that stores refs, objects, and named files in memory. 

2306 

2307 MemoryRepos are always bare: they have no working tree and no index, since 

2308 those have a stronger dependency on the filesystem. 

2309 """ 

2310 

2311 def __init__(self) -> None: 

2312 """Create a new repository in memory.""" 

2313 from .config import ConfigFile 

2314 

2315 self._reflog: list[Any] = [] 

2316 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2317 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore 

2318 self._named_files: dict[str, bytes] = {} 

2319 self.bare = True 

2320 self._config = ConfigFile() 

2321 self._description = None 

2322 

2323 def _append_reflog(self, *args) -> None: 

2324 self._reflog.append(args) 

2325 

2326 def set_description(self, description) -> None: 

2327 self._description = description 

2328 

2329 def get_description(self): 

2330 return self._description 

2331 

2332 def _determine_file_mode(self): 

2333 """Probe the file-system to determine whether permissions can be trusted. 

2334 

2335 Returns: True if permissions can be trusted, False otherwise. 

2336 """ 

2337 return sys.platform != "win32" 

2338 

2339 def _determine_symlinks(self): 

2340 """Probe the file-system to determine whether permissions can be trusted. 

2341 

2342 Returns: True if permissions can be trusted, False otherwise. 

2343 """ 

2344 return sys.platform != "win32" 

2345 

2346 def _put_named_file(self, path, contents) -> None: 

2347 """Write a file to the control dir with the given name and contents. 

2348 

2349 Args: 

2350 path: The path to the file, relative to the control dir. 

2351 contents: A string to write to the file. 

2352 """ 

2353 self._named_files[path] = contents 

2354 

2355 def _del_named_file(self, path) -> None: 

2356 try: 

2357 del self._named_files[path] 

2358 except KeyError: 

2359 pass 

2360 

2361 def get_named_file(self, path, basedir=None): 

2362 """Get a file from the control dir with a specific name. 

2363 

2364 Although the filename should be interpreted as a filename relative to 

2365 the control dir in a disk-baked Repo, the object returned need not be 

2366 pointing to a file in that location. 

2367 

2368 Args: 

2369 path: The path to the file, relative to the control dir. 

2370 Returns: An open file object, or None if the file does not exist. 

2371 """ 

2372 contents = self._named_files.get(path, None) 

2373 if contents is None: 

2374 return None 

2375 return BytesIO(contents) 

2376 

2377 def open_index(self) -> "Index": 

2378 """Fail to open index for this repo, since it is bare. 

2379 

2380 Raises: 

2381 NoIndexPresent: Raised when no index is present 

2382 """ 

2383 raise NoIndexPresent 

2384 

2385 def get_config(self): 

2386 """Retrieve the config object. 

2387 

2388 Returns: `ConfigFile` object. 

2389 """ 

2390 return self._config 

2391 

2392 def get_rebase_state_manager(self): 

2393 """Get the appropriate rebase state manager for this repository. 

2394 

2395 Returns: MemoryRebaseStateManager instance 

2396 """ 

2397 from .rebase import MemoryRebaseStateManager 

2398 

2399 return MemoryRebaseStateManager(self) 

2400 

2401 def get_blob_normalizer(self): 

2402 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2403 from .filters import FilterBlobNormalizer, FilterRegistry 

2404 

2405 # Get GitAttributes object 

2406 git_attributes = self.get_gitattributes() 

2407 config_stack = self.get_config_stack() 

2408 

2409 # Create FilterRegistry with repo reference 

2410 filter_registry = FilterRegistry(config_stack, self) 

2411 

2412 # Return FilterBlobNormalizer which handles all filters 

2413 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self) 

2414 

2415 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes": 

2416 """Read gitattributes for the repository.""" 

2417 from .attrs import GitAttributes 

2418 

2419 # Memory repos don't have working trees or gitattributes files 

2420 # Return empty GitAttributes 

2421 return GitAttributes([]) 

2422 

2423 @classmethod 

2424 def init_bare(cls, objects, refs, format: Optional[int] = None): 

2425 """Create a new bare repository in memory. 

2426 

2427 Args: 

2428 objects: Objects for the new repository, 

2429 as iterable 

2430 refs: Refs as dictionary, mapping names 

2431 to object SHA1s 

2432 format: Repository format version (defaults to 0) 

2433 """ 

2434 ret = cls() 

2435 for obj in objects: 

2436 ret.object_store.add_object(obj) 

2437 for refname, sha in refs.items(): 

2438 ret.refs.add_if_new(refname, sha) 

2439 ret._init_files(bare=True, format=format) 

2440 return ret