Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1114 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32__all__ = [ 

33 "BASE_DIRECTORIES", 

34 "COMMONDIR", 

35 "CONTROLDIR", 

36 "DEFAULT_BRANCH", 

37 "DEFAULT_OFS_DELTA", 

38 "GITDIR", 

39 "INDEX_FILENAME", 

40 "OBJECTDIR", 

41 "REFSDIR", 

42 "REFSDIR_HEADS", 

43 "REFSDIR_TAGS", 

44 "WORKTREES", 

45 "BaseRepo", 

46 "DefaultIdentityNotFound", 

47 "InvalidUserIdentity", 

48 "MemoryRepo", 

49 "ParentsProvider", 

50 "Repo", 

51 "UnsupportedExtension", 

52 "UnsupportedVersion", 

53 "check_user_identity", 

54 "get_user_identity", 

55 "parse_graftpoints", 

56 "parse_shared_repository", 

57 "read_gitfile", 

58 "serialize_graftpoints", 

59] 

60 

61import os 

62import stat 

63import sys 

64import time 

65import warnings 

66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence 

67from io import BytesIO 

68from types import TracebackType 

69from typing import ( 

70 TYPE_CHECKING, 

71 Any, 

72 BinaryIO, 

73 TypeVar, 

74) 

75 

76if TYPE_CHECKING: 

77 # There are no circular imports here, but we try to defer imports as long 

78 # as possible to reduce start-up time for anything that doesn't need 

79 # these imports. 

80 from .attrs import GitAttributes 

81 from .config import ConditionMatcher, ConfigFile, StackedConfig 

82 from .diff_tree import RenameDetector 

83 from .filters import FilterBlobNormalizer, FilterContext 

84 from .index import Index 

85 from .notes import Notes 

86 from .object_format import ObjectFormat 

87 from .object_store import BaseObjectStore, GraphWalker 

88 from .pack import UnpackedObject 

89 from .rebase import RebaseStateManager 

90 from .walk import Walker 

91 from .worktree import WorkTree 

92 

93from . import reflog 

94from .errors import ( 

95 NoIndexPresent, 

96 NotBlobError, 

97 NotCommitError, 

98 NotGitRepository, 

99 NotTagError, 

100 NotTreeError, 

101 RefFormatError, 

102) 

103from .file import GitFile 

104from .hooks import ( 

105 CommitMsgShellHook, 

106 Hook, 

107 PostCommitShellHook, 

108 PostReceiveShellHook, 

109 PreCommitShellHook, 

110 PreReceiveShellHook, 

111 UpdateShellHook, 

112) 

113from .object_store import ( 

114 DiskObjectStore, 

115 MemoryObjectStore, 

116 MissingObjectFinder, 

117 ObjectStoreGraphWalker, 

118 PackBasedObjectStore, 

119 PackCapableObjectStore, 

120 find_shallow, 

121 peel_sha, 

122) 

123from .objects import ( 

124 Blob, 

125 Commit, 

126 ObjectID, 

127 RawObjectID, 

128 ShaFile, 

129 Tag, 

130 Tree, 

131 check_hexsha, 

132 valid_hexsha, 

133) 

134from .pack import generate_unpacked_objects 

135from .refs import ( 

136 HEADREF, 

137 LOCAL_TAG_PREFIX, # noqa: F401 

138 SYMREF, # noqa: F401 

139 DictRefsContainer, 

140 DiskRefsContainer, 

141 Ref, 

142 RefsContainer, 

143 _set_default_branch, 

144 _set_head, 

145 _set_origin_head, 

146 check_ref_format, # noqa: F401 

147 extract_branch_name, 

148 is_per_worktree_ref, 

149 local_branch_name, 

150 read_packed_refs, # noqa: F401 

151 read_packed_refs_with_peeled, # noqa: F401 

152 write_packed_refs, # noqa: F401 

153) 

154 

155CONTROLDIR = ".git" 

156OBJECTDIR = "objects" 

157DEFAULT_OFS_DELTA = True 

158 

159T = TypeVar("T", bound="ShaFile") 

160REFSDIR = "refs" 

161REFSDIR_TAGS = "tags" 

162REFSDIR_HEADS = "heads" 

163INDEX_FILENAME = "index" 

164COMMONDIR = "commondir" 

165GITDIR = "gitdir" 

166WORKTREES = "worktrees" 

167 

168BASE_DIRECTORIES = [ 

169 ["branches"], 

170 [REFSDIR], 

171 [REFSDIR, REFSDIR_TAGS], 

172 [REFSDIR, REFSDIR_HEADS], 

173 ["hooks"], 

174 ["info"], 

175] 

176 

177DEFAULT_BRANCH = b"master" 

178 

179 

180class InvalidUserIdentity(Exception): 

181 """User identity is not of the format 'user <email>'.""" 

182 

183 def __init__(self, identity: str) -> None: 

184 """Initialize InvalidUserIdentity exception.""" 

185 self.identity = identity 

186 

187 

188class DefaultIdentityNotFound(Exception): 

189 """Default identity could not be determined.""" 

190 

191 

192# TODO(jelmer): Cache? 

193def _get_default_identity() -> tuple[str, str]: 

194 import socket 

195 

196 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

197 username = os.environ.get(name) 

198 if username: 

199 break 

200 else: 

201 username = None 

202 

203 try: 

204 import pwd 

205 except ImportError: 

206 fullname = None 

207 else: 

208 try: 

209 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

210 except KeyError: 

211 fullname = None 

212 else: 

213 if getattr(entry, "gecos", None): 

214 fullname = entry.pw_gecos.split(",")[0] 

215 else: 

216 fullname = None 

217 if username is None: 

218 username = entry.pw_name 

219 if not fullname: 

220 if username is None: 

221 raise DefaultIdentityNotFound("no username found") 

222 fullname = username 

223 email = os.environ.get("EMAIL") 

224 if email is None: 

225 if username is None: 

226 raise DefaultIdentityNotFound("no username found") 

227 email = f"{username}@{socket.gethostname()}" 

228 return (fullname, email) 

229 

230 

231def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes: 

232 """Determine the identity to use for new commits. 

233 

234 If kind is set, this first checks 

235 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

236 

237 If those variables are not set, then it will fall back 

238 to reading the user.name and user.email settings from 

239 the specified configuration. 

240 

241 If that also fails, then it will fall back to using 

242 the current users' identity as obtained from the host 

243 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

244 

245 Args: 

246 config: Configuration stack to read from 

247 kind: Optional kind to return identity for, 

248 usually either "AUTHOR" or "COMMITTER". 

249 

250 Returns: 

251 A user identity 

252 """ 

253 user: bytes | None = None 

254 email: bytes | None = None 

255 if kind: 

256 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

257 if user_uc is not None: 

258 user = user_uc.encode("utf-8") 

259 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

260 if email_uc is not None: 

261 email = email_uc.encode("utf-8") 

262 if user is None: 

263 try: 

264 user = config.get(("user",), "name") 

265 except KeyError: 

266 user = None 

267 if email is None: 

268 try: 

269 email = config.get(("user",), "email") 

270 except KeyError: 

271 email = None 

272 default_user, default_email = _get_default_identity() 

273 if user is None: 

274 user = default_user.encode("utf-8") 

275 if email is None: 

276 email = default_email.encode("utf-8") 

277 if email.startswith(b"<") and email.endswith(b">"): 

278 email = email[1:-1] 

279 return user + b" <" + email + b">" 

280 

281 

282def check_user_identity(identity: bytes) -> None: 

283 """Verify that a user identity is formatted correctly. 

284 

285 Args: 

286 identity: User identity bytestring 

287 Raises: 

288 InvalidUserIdentity: Raised when identity is invalid 

289 """ 

290 try: 

291 _fst, snd = identity.split(b" <", 1) 

292 except ValueError as exc: 

293 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

294 if b">" not in snd: 

295 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

296 if b"\0" in identity or b"\n" in identity: 

297 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

298 

299 

300def parse_graftpoints( 

301 graftpoints: Iterable[bytes], 

302) -> dict[ObjectID, list[ObjectID]]: 

303 """Convert a list of graftpoints into a dict. 

304 

305 Args: 

306 graftpoints: Iterator of graftpoint lines 

307 

308 Each line is formatted as: 

309 <commit sha1> <parent sha1> [<parent sha1>]* 

310 

311 Resulting dictionary is: 

312 <commit sha1>: [<parent sha1>*] 

313 

314 https://git.wiki.kernel.org/index.php/GraftPoint 

315 """ 

316 grafts: dict[ObjectID, list[ObjectID]] = {} 

317 for line in graftpoints: 

318 raw_graft = line.split(None, 1) 

319 

320 commit = ObjectID(raw_graft[0]) 

321 if len(raw_graft) == 2: 

322 parents = [ObjectID(p) for p in raw_graft[1].split()] 

323 else: 

324 parents = [] 

325 

326 for sha in [commit, *parents]: 

327 check_hexsha(sha, "Invalid graftpoint") 

328 

329 grafts[commit] = parents 

330 return grafts 

331 

332 

333def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes: 

334 """Convert a dictionary of grafts into string. 

335 

336 The graft dictionary is: 

337 <commit sha1>: [<parent sha1>*] 

338 

339 Each line is formatted as: 

340 <commit sha1> <parent sha1> [<parent sha1>]* 

341 

342 https://git.wiki.kernel.org/index.php/GraftPoint 

343 

344 """ 

345 graft_lines = [] 

346 for commit, parents in graftpoints.items(): 

347 if parents: 

348 graft_lines.append(commit + b" " + b" ".join(parents)) 

349 else: 

350 graft_lines.append(commit) 

351 return b"\n".join(graft_lines) 

352 

353 

354def _set_filesystem_hidden(path: str) -> None: 

355 """Mark path as to be hidden if supported by platform and filesystem. 

356 

357 On win32 uses SetFileAttributesW api: 

358 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

359 """ 

360 if sys.platform == "win32": 

361 import ctypes 

362 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

363 

364 FILE_ATTRIBUTE_HIDDEN = 2 

365 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

366 ("SetFileAttributesW", ctypes.windll.kernel32) 

367 ) 

368 

369 if isinstance(path, bytes): 

370 path = os.fsdecode(path) 

371 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

372 pass # Could raise or log `ctypes.WinError()` here 

373 

374 # Could implement other platform specific filesystem hiding here 

375 

376 

377def parse_shared_repository( 

378 value: str | bytes | bool, 

379) -> tuple[int | None, int | None]: 

380 """Parse core.sharedRepository configuration value. 

381 

382 Args: 

383 value: Configuration value (string, bytes, or boolean) 

384 

385 Returns: 

386 tuple of (file_mask, directory_mask) or (None, None) if not shared 

387 

388 The masks are permission bits to apply via chmod. 

389 """ 

390 if isinstance(value, bytes): 

391 value = value.decode("utf-8", errors="replace") 

392 

393 # Handle boolean values 

394 if isinstance(value, bool): 

395 if value: 

396 # true = group (same as "group") 

397 return (0o664, 0o2775) 

398 else: 

399 # false = umask (use system umask, no adjustment) 

400 return (None, None) 

401 

402 # Handle string values 

403 value_lower = value.lower() 

404 

405 if value_lower in ("false", "0", ""): 

406 # Use umask (no adjustment) 

407 return (None, None) 

408 

409 if value_lower in ("true", "1", "group"): 

410 # Group writable (with setgid bit) 

411 return (0o664, 0o2775) 

412 

413 if value_lower in ("all", "world", "everybody", "2"): 

414 # World readable/writable (with setgid bit) 

415 return (0o666, 0o2777) 

416 

417 if value_lower == "umask": 

418 # Explicitly use umask 

419 return (None, None) 

420 

421 # Try to parse as octal 

422 if value.startswith("0"): 

423 try: 

424 mode = int(value, 8) 

425 # For directories, add execute bits where read bits are set 

426 # and add setgid bit for shared repositories 

427 dir_mode = mode | 0o2000 # Add setgid bit 

428 if mode & 0o004: 

429 dir_mode |= 0o001 

430 if mode & 0o040: 

431 dir_mode |= 0o010 

432 if mode & 0o400: 

433 dir_mode |= 0o100 

434 return (mode, dir_mode) 

435 except ValueError: 

436 pass 

437 

438 # Default to umask for unrecognized values 

439 return (None, None) 

440 

441 

442class ParentsProvider: 

443 """Provider for commit parent information.""" 

444 

445 def __init__( 

446 self, 

447 store: "BaseObjectStore", 

448 grafts: dict[ObjectID, list[ObjectID]] = {}, 

449 shallows: Iterable[ObjectID] = [], 

450 ) -> None: 

451 """Initialize ParentsProvider. 

452 

453 Args: 

454 store: Object store to use 

455 grafts: Graft information 

456 shallows: Shallow commit SHAs 

457 """ 

458 self.store = store 

459 self.grafts = grafts 

460 self.shallows = set(shallows) 

461 

462 # Get commit graph once at initialization for performance 

463 self.commit_graph = store.get_commit_graph() 

464 

465 def get_parents( 

466 self, commit_id: ObjectID, commit: Commit | None = None 

467 ) -> list[ObjectID]: 

468 """Get parents for a commit using the parents provider.""" 

469 try: 

470 return self.grafts[commit_id] 

471 except KeyError: 

472 pass 

473 if commit_id in self.shallows: 

474 return [] 

475 

476 # Try to use commit graph for faster parent lookup 

477 if self.commit_graph: 

478 parents = self.commit_graph.get_parents(commit_id) 

479 if parents is not None: 

480 return parents 

481 

482 # Fallback to reading the commit object 

483 if commit is None: 

484 obj = self.store[commit_id] 

485 if not isinstance(obj, Commit): 

486 raise ValueError( 

487 f"Expected Commit object for commit_id {commit_id.decode()}, " 

488 f"got {type(obj).__name__}. This usually means a reference " 

489 f"points to a {type(obj).__name__} object instead of a Commit." 

490 ) 

491 commit = obj 

492 result: list[ObjectID] = commit.parents 

493 return result 

494 

495 

496class BaseRepo: 

497 """Base class for a git repository. 

498 

499 This base class is meant to be used for Repository implementations that e.g. 

500 work on top of a different transport than a standard filesystem path. 

501 

502 Attributes: 

503 object_store: Dictionary-like object for accessing 

504 the objects 

505 refs: Dictionary-like object with the refs in this 

506 repository 

507 """ 

508 

509 def __init__( 

510 self, 

511 object_store: "PackCapableObjectStore", 

512 refs: RefsContainer, 

513 object_format: "ObjectFormat | None" = None, 

514 ) -> None: 

515 """Open a repository. 

516 

517 This shouldn't be called directly, but rather through one of the 

518 base classes, such as MemoryRepo or Repo. 

519 

520 Args: 

521 object_store: Object store to use 

522 refs: Refs container to use 

523 object_format: Hash algorithm to use (if None, will use object_store's format) 

524 """ 

525 self.object_store = object_store 

526 self.refs = refs 

527 

528 self._graftpoints: dict[ObjectID, list[ObjectID]] = {} 

529 self.hooks: dict[str, Hook] = {} 

530 if object_format is None: 

531 self.object_format: ObjectFormat = object_store.object_format 

532 else: 

533 self.object_format = object_format 

534 

535 def _determine_file_mode(self) -> bool: 

536 """Probe the file-system to determine whether permissions can be trusted. 

537 

538 Returns: True if permissions can be trusted, False otherwise. 

539 """ 

540 raise NotImplementedError(self._determine_file_mode) 

541 

542 def _determine_symlinks(self) -> bool: 

543 """Probe the filesystem to determine whether symlinks can be created. 

544 

545 Returns: True if symlinks can be created, False otherwise. 

546 """ 

547 # For now, just mimic the old behaviour 

548 return sys.platform != "win32" 

549 

550 def _init_files( 

551 self, 

552 bare: bool, 

553 symlinks: bool | None = None, 

554 format: int | None = None, 

555 shared_repository: str | bool | None = None, 

556 object_format: str | None = None, 

557 ) -> None: 

558 """Initialize a default set of named files.""" 

559 from .config import ConfigFile 

560 

561 self._put_named_file("description", b"Unnamed repository") 

562 f = BytesIO() 

563 cf = ConfigFile() 

564 

565 # Determine the appropriate format version 

566 if object_format == "sha256": 

567 # SHA256 requires format version 1 

568 if format is None: 

569 format = 1 

570 elif format != 1: 

571 raise ValueError( 

572 "SHA256 object format requires repository format version 1" 

573 ) 

574 else: 

575 # SHA1 (default) can use format 0 or 1 

576 if format is None: 

577 format = 0 

578 

579 if format not in (0, 1): 

580 raise ValueError(f"Unsupported repository format version: {format}") 

581 

582 cf.set("core", "repositoryformatversion", str(format)) 

583 

584 # Set object format extension if using SHA256 

585 if object_format == "sha256": 

586 cf.set("extensions", "objectformat", "sha256") 

587 

588 # Set hash algorithm based on object format 

589 from .object_format import get_object_format 

590 

591 self.object_format = get_object_format(object_format) 

592 

593 if self._determine_file_mode(): 

594 cf.set("core", "filemode", True) 

595 else: 

596 cf.set("core", "filemode", False) 

597 

598 if symlinks is None and not bare: 

599 symlinks = self._determine_symlinks() 

600 

601 if symlinks is False: 

602 cf.set("core", "symlinks", symlinks) 

603 

604 # On macOS, set precomposeunicode to true since HFS+/APFS 

605 # returns filenames in NFD (decomposed) Unicode form 

606 if sys.platform == "darwin": 

607 cf.set("core", "precomposeunicode", True) 

608 

609 cf.set("core", "bare", bare) 

610 cf.set("core", "logallrefupdates", True) 

611 

612 # Set shared repository if specified 

613 if shared_repository is not None: 

614 if isinstance(shared_repository, bool): 

615 cf.set("core", "sharedRepository", shared_repository) 

616 else: 

617 cf.set("core", "sharedRepository", shared_repository) 

618 

619 cf.write_to_file(f) 

620 self._put_named_file("config", f.getvalue()) 

621 self._put_named_file(os.path.join("info", "exclude"), b"") 

622 

623 # Allow subclasses to handle config initialization 

624 self._init_config(cf) 

625 

626 def _init_config(self, config: "ConfigFile") -> None: 

627 """Initialize repository configuration. 

628 

629 This method can be overridden by subclasses to handle config initialization. 

630 

631 Args: 

632 config: The ConfigFile object that was just created 

633 """ 

634 # Default implementation does nothing 

635 

636 def get_named_file(self, path: str) -> BinaryIO | None: 

637 """Get a file from the control dir with a specific name. 

638 

639 Although the filename should be interpreted as a filename relative to 

640 the control dir in a disk-based Repo, the object returned need not be 

641 pointing to a file in that location. 

642 

643 Args: 

644 path: The path to the file, relative to the control dir. 

645 Returns: An open file object, or None if the file does not exist. 

646 """ 

647 raise NotImplementedError(self.get_named_file) 

648 

649 def _put_named_file(self, path: str, contents: bytes) -> None: 

650 """Write a file to the control dir with the given name and contents. 

651 

652 Args: 

653 path: The path to the file, relative to the control dir. 

654 contents: A string to write to the file. 

655 """ 

656 raise NotImplementedError(self._put_named_file) 

657 

658 def _del_named_file(self, path: str) -> None: 

659 """Delete a file in the control directory with the given name.""" 

660 raise NotImplementedError(self._del_named_file) 

661 

662 def open_index(self) -> "Index": 

663 """Open the index for this repository. 

664 

665 Raises: 

666 NoIndexPresent: If no index is present 

667 Returns: The matching `Index` 

668 """ 

669 raise NotImplementedError(self.open_index) 

670 

671 def _change_object_format(self, object_format_name: str) -> None: 

672 """Change the object format of this repository. 

673 

674 This can only be done if the object store is empty (no objects written yet). 

675 

676 Args: 

677 object_format_name: Name of the new object format (e.g., "sha1", "sha256") 

678 

679 Raises: 

680 AssertionError: If the object store is not empty 

681 """ 

682 # Check if object store has any objects 

683 for _ in self.object_store: 

684 raise AssertionError( 

685 "Cannot change object format: repository already contains objects" 

686 ) 

687 

688 # Update the object format 

689 from .object_format import get_object_format 

690 

691 new_format = get_object_format(object_format_name) 

692 self.object_format = new_format 

693 self.object_store.object_format = new_format 

694 

695 # Update config file 

696 config = self.get_config() 

697 

698 if object_format_name == "sha1": 

699 # For SHA-1, explicitly remove objectformat extension if present 

700 try: 

701 config.remove("extensions", "objectformat") 

702 except KeyError: 

703 pass 

704 else: 

705 # For non-SHA-1 formats, set repositoryformatversion to 1 and objectformat extension 

706 config.set("core", "repositoryformatversion", "1") 

707 config.set("extensions", "objectformat", object_format_name) 

708 

709 config.write_to_path() 

710 

711 def fetch( 

712 self, 

713 target: "BaseRepo", 

714 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]] 

715 | None = None, 

716 progress: Callable[..., None] | None = None, 

717 depth: int | None = None, 

718 ) -> dict[Ref, ObjectID]: 

719 """Fetch objects into another repository. 

720 

721 Args: 

722 target: The target repository 

723 determine_wants: Optional function to determine what refs to 

724 fetch. 

725 progress: Optional progress function 

726 depth: Optional shallow fetch depth 

727 Returns: The local refs 

728 """ 

729 # Fix object format if needed 

730 if self.object_format != target.object_format: 

731 # Change the target repo's format if it's empty 

732 target._change_object_format(self.object_format.name) 

733 

734 if determine_wants is None: 

735 determine_wants = target.object_store.determine_wants_all 

736 count, pack_data = self.fetch_pack_data( 

737 determine_wants, 

738 target.get_graph_walker(), 

739 progress=progress, 

740 depth=depth, 

741 ) 

742 target.object_store.add_pack_data(count, pack_data, progress) 

743 return self.get_refs() 

744 

745 def fetch_pack_data( 

746 self, 

747 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]], 

748 graph_walker: "GraphWalker", 

749 progress: Callable[[bytes], None] | None, 

750 *, 

751 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None, 

752 depth: int | None = None, 

753 ) -> tuple[int, Iterator["UnpackedObject"]]: 

754 """Fetch the pack data required for a set of revisions. 

755 

756 Args: 

757 determine_wants: Function that takes a dictionary with heads 

758 and returns the list of heads to fetch. 

759 graph_walker: Object that can iterate over the list of revisions 

760 to fetch and has an "ack" method that will be called to acknowledge 

761 that a revision is present. 

762 progress: Simple progress function that will be called with 

763 updated progress strings. 

764 get_tagged: Function that returns a dict of pointed-to sha -> 

765 tag sha for including tags. 

766 depth: Shallow fetch depth 

767 Returns: count and iterator over pack data 

768 """ 

769 missing_objects = self.find_missing_objects( 

770 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

771 ) 

772 if missing_objects is None: 

773 return 0, iter([]) 

774 remote_has = missing_objects.get_remote_has() 

775 object_ids = list(missing_objects) 

776 return len(object_ids), generate_unpacked_objects( 

777 self.object_store, object_ids, progress=progress, other_haves=remote_has 

778 ) 

779 

780 def find_missing_objects( 

781 self, 

782 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]], 

783 graph_walker: "GraphWalker", 

784 progress: Callable[[bytes], None] | None, 

785 *, 

786 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None, 

787 depth: int | None = None, 

788 ) -> MissingObjectFinder | None: 

789 """Fetch the missing objects required for a set of revisions. 

790 

791 Args: 

792 determine_wants: Function that takes a dictionary with heads 

793 and returns the list of heads to fetch. 

794 graph_walker: Object that can iterate over the list of revisions 

795 to fetch and has an "ack" method that will be called to acknowledge 

796 that a revision is present. 

797 progress: Simple progress function that will be called with 

798 updated progress strings. 

799 get_tagged: Function that returns a dict of pointed-to sha -> 

800 tag sha for including tags. 

801 depth: Shallow fetch depth 

802 Returns: iterator over objects, with __len__ implemented 

803 """ 

804 import logging 

805 

806 # Filter out refs pointing to missing objects to avoid errors downstream. 

807 # This makes Dulwich more robust when dealing with broken refs on disk. 

808 # Previously serialize_refs() did this filtering as a side-effect. 

809 all_refs = self.get_refs() 

810 refs: dict[Ref, ObjectID] = {} 

811 for ref, sha in all_refs.items(): 

812 if sha in self.object_store: 

813 refs[ref] = sha 

814 else: 

815 logging.warning( 

816 "ref %s points at non-present sha %s", 

817 ref.decode("utf-8", "replace"), 

818 sha.decode("ascii"), 

819 ) 

820 

821 wants = determine_wants(refs, depth) 

822 if not isinstance(wants, list): 

823 raise TypeError("determine_wants() did not return a list") 

824 

825 current_shallow = set(getattr(graph_walker, "shallow", set())) 

826 

827 if depth not in (None, 0): 

828 assert depth is not None 

829 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

830 # Only update if graph_walker has shallow attribute 

831 if hasattr(graph_walker, "shallow"): 

832 graph_walker.shallow.update(shallow - not_shallow) 

833 new_shallow = graph_walker.shallow - current_shallow 

834 unshallow = not_shallow & current_shallow 

835 setattr(graph_walker, "unshallow", unshallow) 

836 if hasattr(graph_walker, "update_shallow"): 

837 graph_walker.update_shallow(new_shallow, unshallow) 

838 else: 

839 unshallow = getattr(graph_walker, "unshallow", set()) 

840 

841 if wants == []: 

842 # TODO(dborowitz): find a way to short-circuit that doesn't change 

843 # this interface. 

844 

845 if getattr(graph_walker, "shallow", set()) or unshallow: 

846 # Do not send a pack in shallow short-circuit path 

847 return None 

848 

849 # Return an actual MissingObjectFinder with empty wants 

850 return MissingObjectFinder( 

851 self.object_store, 

852 haves=[], 

853 wants=[], 

854 ) 

855 

856 # If the graph walker is set up with an implementation that can 

857 # ACK/NAK to the wire, it will write data to the client through 

858 # this call as a side-effect. 

859 haves = self.object_store.find_common_revisions(graph_walker) 

860 

861 # Deal with shallow requests separately because the haves do 

862 # not reflect what objects are missing 

863 if getattr(graph_walker, "shallow", set()) or unshallow: 

864 # TODO: filter the haves commits from iter_shas. the specific 

865 # commits aren't missing. 

866 haves = [] 

867 

868 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

869 

870 def get_parents(commit: Commit) -> list[ObjectID]: 

871 """Get parents for a commit using the parents provider. 

872 

873 Args: 

874 commit: Commit object 

875 

876 Returns: 

877 List of parent commit SHAs 

878 """ 

879 return parents_provider.get_parents(commit.id, commit) 

880 

881 return MissingObjectFinder( 

882 self.object_store, 

883 haves=haves, 

884 wants=wants, 

885 shallow=getattr(graph_walker, "shallow", set()), 

886 progress=progress, 

887 get_tagged=get_tagged, 

888 get_parents=get_parents, 

889 ) 

890 

891 def generate_pack_data( 

892 self, 

893 have: set[ObjectID], 

894 want: set[ObjectID], 

895 *, 

896 shallow: set[ObjectID] | None = None, 

897 progress: Callable[[str], None] | None = None, 

898 ofs_delta: bool | None = None, 

899 ) -> tuple[int, Iterator["UnpackedObject"]]: 

900 """Generate pack data objects for a set of wants/haves. 

901 

902 Args: 

903 have: List of SHA1s of objects that should not be sent 

904 want: List of SHA1s of objects that should be sent 

905 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits) 

906 ofs_delta: Whether OFS deltas can be included 

907 progress: Optional progress reporting method 

908 """ 

909 if shallow is None: 

910 shallow = self.get_shallow() 

911 return self.object_store.generate_pack_data( 

912 have, 

913 want, 

914 shallow=shallow, 

915 progress=progress, 

916 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

917 ) 

918 

919 def get_graph_walker( 

920 self, heads: list[ObjectID] | None = None 

921 ) -> ObjectStoreGraphWalker: 

922 """Retrieve a graph walker. 

923 

924 A graph walker is used by a remote repository (or proxy) 

925 to find out which objects are present in this repository. 

926 

927 Args: 

928 heads: Repository heads to use (optional) 

929 Returns: A graph walker object 

930 """ 

931 if heads is None: 

932 heads = [ 

933 sha 

934 for sha in self.refs.as_dict(Ref(b"refs/heads")).values() 

935 if sha in self.object_store 

936 ] 

937 parents_provider = ParentsProvider(self.object_store) 

938 return ObjectStoreGraphWalker( 

939 heads, 

940 parents_provider.get_parents, 

941 shallow=self.get_shallow(), 

942 update_shallow=self.update_shallow, 

943 ) 

944 

945 def get_refs(self) -> dict[Ref, ObjectID]: 

946 """Get dictionary with all refs. 

947 

948 Returns: A ``dict`` mapping ref names to SHA1s 

949 """ 

950 return self.refs.as_dict() 

951 

952 def head(self) -> ObjectID: 

953 """Return the SHA1 pointed at by HEAD.""" 

954 # TODO: move this method to WorkTree 

955 return self.refs[HEADREF] 

956 

957 def _get_object(self, sha: ObjectID | RawObjectID, cls: type[T]) -> T: 

958 assert len(sha) in ( 

959 self.object_format.oid_length, 

960 self.object_format.hex_length, 

961 ) 

962 ret = self.get_object(sha) 

963 if not isinstance(ret, cls): 

964 if cls is Commit: 

965 raise NotCommitError(ret.id) 

966 elif cls is Blob: 

967 raise NotBlobError(ret.id) 

968 elif cls is Tree: 

969 raise NotTreeError(ret.id) 

970 elif cls is Tag: 

971 raise NotTagError(ret.id) 

972 else: 

973 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

974 return ret 

975 

976 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile: 

977 """Retrieve the object with the specified SHA. 

978 

979 Args: 

980 sha: SHA to retrieve 

981 Returns: A ShaFile object 

982 Raises: 

983 KeyError: when the object can not be found 

984 """ 

985 return self.object_store[sha] 

986 

987 def parents_provider(self) -> ParentsProvider: 

988 """Get a parents provider for this repository. 

989 

990 Returns: 

991 ParentsProvider instance configured with grafts and shallows 

992 """ 

993 return ParentsProvider( 

994 self.object_store, 

995 grafts=self._graftpoints, 

996 shallows=self.get_shallow(), 

997 ) 

998 

999 def get_parents( 

1000 self, sha: ObjectID, commit: Commit | None = None 

1001 ) -> list[ObjectID]: 

1002 """Retrieve the parents of a specific commit. 

1003 

1004 If the specific commit is a graftpoint, the graft parents 

1005 will be returned instead. 

1006 

1007 Args: 

1008 sha: SHA of the commit for which to retrieve the parents 

1009 commit: Optional commit matching the sha 

1010 Returns: List of parents 

1011 """ 

1012 return self.parents_provider().get_parents(sha, commit) 

1013 

1014 def get_config(self) -> "ConfigFile": 

1015 """Retrieve the config object. 

1016 

1017 Returns: `ConfigFile` object for the ``.git/config`` file. 

1018 """ 

1019 raise NotImplementedError(self.get_config) 

1020 

1021 def get_worktree_config(self) -> "ConfigFile": 

1022 """Retrieve the worktree config object.""" 

1023 raise NotImplementedError(self.get_worktree_config) 

1024 

1025 def get_description(self) -> bytes | None: 

1026 """Retrieve the description for this repository. 

1027 

1028 Returns: Bytes with the description of the repository 

1029 as set by the user. 

1030 """ 

1031 raise NotImplementedError(self.get_description) 

1032 

1033 def set_description(self, description: bytes) -> None: 

1034 """Set the description for this repository. 

1035 

1036 Args: 

1037 description: Text to set as description for this repository. 

1038 """ 

1039 raise NotImplementedError(self.set_description) 

1040 

1041 def get_rebase_state_manager(self) -> "RebaseStateManager": 

1042 """Get the appropriate rebase state manager for this repository. 

1043 

1044 Returns: RebaseStateManager instance 

1045 """ 

1046 raise NotImplementedError(self.get_rebase_state_manager) 

1047 

1048 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

1049 """Return a BlobNormalizer object for checkin/checkout operations. 

1050 

1051 Returns: BlobNormalizer instance 

1052 """ 

1053 raise NotImplementedError(self.get_blob_normalizer) 

1054 

1055 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

1056 """Read gitattributes for the repository. 

1057 

1058 Args: 

1059 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

1060 

1061 Returns: 

1062 GitAttributes object that can be used to match paths 

1063 """ 

1064 raise NotImplementedError(self.get_gitattributes) 

1065 

1066 def get_config_stack(self) -> "StackedConfig": 

1067 """Return a config stack for this repository. 

1068 

1069 This stack accesses the configuration for both this repository 

1070 itself (.git/config) and the global configuration, which usually 

1071 lives in ~/.gitconfig. 

1072 

1073 Returns: `Config` instance for this repository 

1074 """ 

1075 from .config import ConfigFile, StackedConfig 

1076 

1077 local_config = self.get_config() 

1078 backends: list[ConfigFile] = [local_config] 

1079 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

1080 backends.append(self.get_worktree_config()) 

1081 

1082 backends += StackedConfig.default_backends() 

1083 return StackedConfig(backends, writable=local_config) 

1084 

1085 def get_shallow(self) -> set[ObjectID]: 

1086 """Get the set of shallow commits. 

1087 

1088 Returns: Set of shallow commits. 

1089 """ 

1090 f = self.get_named_file("shallow") 

1091 if f is None: 

1092 return set() 

1093 with f: 

1094 return {ObjectID(line.strip()) for line in f} 

1095 

1096 def update_shallow( 

1097 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None 

1098 ) -> None: 

1099 """Update the list of shallow objects. 

1100 

1101 Args: 

1102 new_shallow: Newly shallow objects 

1103 new_unshallow: Newly no longer shallow objects 

1104 """ 

1105 shallow = self.get_shallow() 

1106 if new_shallow: 

1107 shallow.update(new_shallow) 

1108 if new_unshallow: 

1109 shallow.difference_update(new_unshallow) 

1110 if shallow: 

1111 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

1112 else: 

1113 self._del_named_file("shallow") 

1114 

1115 def get_peeled(self, ref: Ref) -> ObjectID: 

1116 """Get the peeled value of a ref. 

1117 

1118 Args: 

1119 ref: The refname to peel. 

1120 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

1121 intermediate tags; if the original ref does not point to a tag, 

1122 this will equal the original SHA1. 

1123 """ 

1124 cached = self.refs.get_peeled(ref) 

1125 if cached is not None: 

1126 return cached 

1127 return peel_sha(self.object_store, self.refs[ref])[1].id 

1128 

1129 @property 

1130 def notes(self) -> "Notes": 

1131 """Access notes functionality for this repository. 

1132 

1133 Returns: 

1134 Notes object for accessing notes 

1135 """ 

1136 from .notes import Notes 

1137 

1138 return Notes(self.object_store, self.refs) 

1139 

1140 def get_walker( 

1141 self, 

1142 include: Sequence[ObjectID] | None = None, 

1143 exclude: Sequence[ObjectID] | None = None, 

1144 order: str = "date", 

1145 reverse: bool = False, 

1146 max_entries: int | None = None, 

1147 paths: Sequence[bytes] | None = None, 

1148 rename_detector: "RenameDetector | None" = None, 

1149 follow: bool = False, 

1150 since: int | None = None, 

1151 until: int | None = None, 

1152 queue_cls: type | None = None, 

1153 ) -> "Walker": 

1154 """Obtain a walker for this repository. 

1155 

1156 Args: 

1157 include: Iterable of SHAs of commits to include along with their 

1158 ancestors. Defaults to [HEAD] 

1159 exclude: Iterable of SHAs of commits to exclude along with their 

1160 ancestors, overriding includes. 

1161 order: ORDER_* constant specifying the order of results. 

1162 Anything other than ORDER_DATE may result in O(n) memory usage. 

1163 reverse: If True, reverse the order of output, requiring O(n) 

1164 memory. 

1165 max_entries: The maximum number of entries to yield, or None for 

1166 no limit. 

1167 paths: Iterable of file or subtree paths to show entries for. 

1168 rename_detector: diff.RenameDetector object for detecting 

1169 renames. 

1170 follow: If True, follow path across renames/copies. Forces a 

1171 default rename_detector. 

1172 since: Timestamp to list commits after. 

1173 until: Timestamp to list commits before. 

1174 queue_cls: A class to use for a queue of commits, supporting the 

1175 iterator protocol. The constructor takes a single argument, the Walker. 

1176 

1177 Returns: A `Walker` object 

1178 """ 

1179 from .walk import Walker, _CommitTimeQueue 

1180 

1181 if include is None: 

1182 include = [self.head()] 

1183 

1184 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

1185 return Walker( 

1186 self.object_store, 

1187 include, 

1188 exclude=exclude, 

1189 order=order, 

1190 reverse=reverse, 

1191 max_entries=max_entries, 

1192 paths=paths, 

1193 rename_detector=rename_detector, 

1194 follow=follow, 

1195 since=since, 

1196 until=until, 

1197 get_parents=lambda commit: self.get_parents(commit.id, commit), 

1198 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

1199 ) 

1200 

1201 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile": 

1202 """Retrieve a Git object by SHA1 or ref. 

1203 

1204 Args: 

1205 name: A Git object SHA1 or a ref name 

1206 Returns: A `ShaFile` object, such as a Commit or Blob 

1207 Raises: 

1208 KeyError: when the specified ref or object does not exist 

1209 """ 

1210 if not isinstance(name, bytes): 

1211 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

1212 # If it looks like a ref name, only try refs 

1213 if name == b"HEAD" or name.startswith(b"refs/"): 

1214 try: 

1215 return self.object_store[self.refs[Ref(name)]] 

1216 except (RefFormatError, KeyError): 

1217 pass 

1218 # Otherwise, try as object ID if length matches 

1219 if len(name) in ( 

1220 self.object_store.object_format.oid_length, 

1221 self.object_store.object_format.hex_length, 

1222 ): 

1223 try: 

1224 return self.object_store[ 

1225 ObjectID(name) 

1226 if len(name) == self.object_store.object_format.hex_length 

1227 else RawObjectID(name) 

1228 ] 

1229 except (KeyError, ValueError): 

1230 pass 

1231 # If nothing worked, raise KeyError 

1232 raise KeyError(name) 

1233 

1234 def __contains__(self, name: bytes) -> bool: 

1235 """Check if a specific Git object or ref is present. 

1236 

1237 Args: 

1238 name: Git object SHA1/SHA256 or ref name 

1239 """ 

1240 if len(name) == 20: 

1241 return RawObjectID(name) in self.object_store or Ref(name) in self.refs 

1242 elif len(name) == 40 and valid_hexsha(name): 

1243 return ObjectID(name) in self.object_store or Ref(name) in self.refs 

1244 # Check if it's a binary or hex SHA 

1245 if len(name) == self.object_format.oid_length: 

1246 return RawObjectID(name) in self.object_store or Ref(name) in self.refs 

1247 elif len(name) == self.object_format.hex_length and valid_hexsha(name): 

1248 return ObjectID(name) in self.object_store or Ref(name) in self.refs 

1249 else: 

1250 return Ref(name) in self.refs 

1251 

1252 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None: 

1253 """Set a ref. 

1254 

1255 Args: 

1256 name: ref name 

1257 value: Ref value - either a ShaFile object, or a hex sha 

1258 """ 

1259 if name.startswith(b"refs/") or name == HEADREF: 

1260 ref_name = Ref(name) 

1261 if isinstance(value, ShaFile): 

1262 self.refs[ref_name] = value.id 

1263 elif isinstance(value, bytes): 

1264 self.refs[ref_name] = ObjectID(value) 

1265 else: 

1266 raise TypeError(value) 

1267 else: 

1268 raise ValueError(name) 

1269 

1270 def __delitem__(self, name: bytes) -> None: 

1271 """Remove a ref. 

1272 

1273 Args: 

1274 name: Name of the ref to remove 

1275 """ 

1276 if name.startswith(b"refs/") or name == HEADREF: 

1277 del self.refs[Ref(name)] 

1278 else: 

1279 raise ValueError(name) 

1280 

1281 def _get_user_identity( 

1282 self, config: "StackedConfig", kind: str | None = None 

1283 ) -> bytes: 

1284 """Determine the identity to use for new commits.""" 

1285 warnings.warn( 

1286 "use get_user_identity() rather than Repo._get_user_identity", 

1287 DeprecationWarning, 

1288 ) 

1289 return get_user_identity(config) 

1290 

1291 def _add_graftpoints( 

1292 self, updated_graftpoints: dict[ObjectID, list[ObjectID]] 

1293 ) -> None: 

1294 """Add or modify graftpoints. 

1295 

1296 Args: 

1297 updated_graftpoints: Dict of commit shas to list of parent shas 

1298 """ 

1299 # Simple validation 

1300 for commit, parents in updated_graftpoints.items(): 

1301 for sha in [commit, *parents]: 

1302 check_hexsha(sha, "Invalid graftpoint") 

1303 

1304 self._graftpoints.update(updated_graftpoints) 

1305 

1306 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None: 

1307 """Remove graftpoints. 

1308 

1309 Args: 

1310 to_remove: List of commit shas 

1311 """ 

1312 for sha in to_remove: 

1313 del self._graftpoints[sha] 

1314 

1315 def _read_heads(self, name: str) -> list[ObjectID]: 

1316 f = self.get_named_file(name) 

1317 if f is None: 

1318 return [] 

1319 with f: 

1320 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()] 

1321 

1322 def get_worktree(self) -> "WorkTree": 

1323 """Get the working tree for this repository. 

1324 

1325 Returns: 

1326 WorkTree instance for performing working tree operations 

1327 

1328 Raises: 

1329 NotImplementedError: If the repository doesn't support working trees 

1330 """ 

1331 raise NotImplementedError( 

1332 "Working tree operations not supported by this repository type" 

1333 ) 

1334 

1335 

1336def read_gitfile(f: BinaryIO) -> str: 

1337 """Read a ``.git`` file. 

1338 

1339 The first line of the file should start with "gitdir: " 

1340 

1341 Args: 

1342 f: File-like object to read from 

1343 Returns: A path 

1344 """ 

1345 cs = f.read() 

1346 if not cs.startswith(b"gitdir: "): 

1347 raise ValueError("Expected file to start with 'gitdir: '") 

1348 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8") 

1349 

1350 

1351class UnsupportedVersion(Exception): 

1352 """Unsupported repository version.""" 

1353 

1354 def __init__(self, version: int) -> None: 

1355 """Initialize UnsupportedVersion exception. 

1356 

1357 Args: 

1358 version: The unsupported repository version 

1359 """ 

1360 self.version = version 

1361 

1362 

1363class UnsupportedExtension(Exception): 

1364 """Unsupported repository extension.""" 

1365 

1366 def __init__(self, extension: str) -> None: 

1367 """Initialize UnsupportedExtension exception. 

1368 

1369 Args: 

1370 extension: The unsupported repository extension 

1371 """ 

1372 self.extension = extension 

1373 

1374 

1375class Repo(BaseRepo): 

1376 """A git repository backed by local disk. 

1377 

1378 To open an existing repository, call the constructor with 

1379 the path of the repository. 

1380 

1381 To create a new repository, use the Repo.init class method. 

1382 

1383 Note that a repository object may hold on to resources such 

1384 as file handles for performance reasons; call .close() to free 

1385 up those resources. 

1386 

1387 Attributes: 

1388 path: Path to the working copy (if it exists) or repository control 

1389 directory (if the repository is bare) 

1390 bare: Whether this is a bare repository 

1391 """ 

1392 

1393 path: str 

1394 bare: bool 

1395 object_store: DiskObjectStore 

1396 filter_context: "FilterContext | None" 

1397 

1398 def __init__( 

1399 self, 

1400 root: str | bytes | os.PathLike[str], 

1401 object_store: PackBasedObjectStore | None = None, 

1402 bare: bool | None = None, 

1403 ) -> None: 

1404 """Open a repository on disk. 

1405 

1406 Args: 

1407 root: Path to the repository's root. 

1408 object_store: ObjectStore to use; if omitted, we use the 

1409 repository's default object store 

1410 bare: True if this is a bare repository. 

1411 """ 

1412 root = os.fspath(root) 

1413 if isinstance(root, bytes): 

1414 root = os.fsdecode(root) 

1415 hidden_path = os.path.join(root, CONTROLDIR) 

1416 if bare is None: 

1417 if os.path.isfile(hidden_path) or os.path.isdir( 

1418 os.path.join(hidden_path, OBJECTDIR) 

1419 ): 

1420 bare = False 

1421 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1422 os.path.join(root, REFSDIR) 

1423 ): 

1424 bare = True 

1425 else: 

1426 raise NotGitRepository( 

1427 "No git repository was found at {path}".format(**dict(path=root)) 

1428 ) 

1429 

1430 self.bare = bare 

1431 if bare is False: 

1432 if os.path.isfile(hidden_path): 

1433 with open(hidden_path, "rb") as f: 

1434 path = read_gitfile(f) 

1435 self._controldir = os.path.join(root, path) 

1436 else: 

1437 self._controldir = hidden_path 

1438 else: 

1439 self._controldir = root 

1440 commondir = self.get_named_file(COMMONDIR) 

1441 if commondir is not None: 

1442 with commondir: 

1443 self._commondir = os.path.join( 

1444 self.controldir(), 

1445 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1446 ) 

1447 else: 

1448 self._commondir = self._controldir 

1449 self.path = root 

1450 

1451 # Initialize refs early so they're available for config condition matchers 

1452 self.refs = DiskRefsContainer( 

1453 self.commondir(), self._controldir, logger=self._write_reflog 

1454 ) 

1455 

1456 # Initialize worktrees container 

1457 from .worktree import WorkTreeContainer 

1458 

1459 self.worktrees = WorkTreeContainer(self) 

1460 

1461 config = self.get_config() 

1462 try: 

1463 repository_format_version = config.get("core", "repositoryformatversion") 

1464 format_version = ( 

1465 0 

1466 if repository_format_version is None 

1467 else int(repository_format_version) 

1468 ) 

1469 except KeyError: 

1470 format_version = 0 

1471 

1472 if format_version not in (0, 1): 

1473 raise UnsupportedVersion(format_version) 

1474 

1475 # Track extensions we encounter 

1476 has_reftable_extension = False 

1477 for extension, value in config.items((b"extensions",)): 

1478 if extension.lower() == b"refstorage": 

1479 if value == b"reftable": 

1480 has_reftable_extension = True 

1481 else: 

1482 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1483 elif extension.lower() not in ( 

1484 b"worktreeconfig", 

1485 b"objectformat", 

1486 b"relativeworktrees", 

1487 ): 

1488 raise UnsupportedExtension(extension.decode("utf-8")) 

1489 

1490 if object_store is None: 

1491 # Get shared repository permissions from config 

1492 try: 

1493 shared_value = config.get(("core",), "sharedRepository") 

1494 file_mode, dir_mode = parse_shared_repository(shared_value) 

1495 except KeyError: 

1496 file_mode, dir_mode = None, None 

1497 

1498 object_store = DiskObjectStore.from_config( 

1499 os.path.join(self.commondir(), OBJECTDIR), 

1500 config, 

1501 file_mode=file_mode, 

1502 dir_mode=dir_mode, 

1503 ) 

1504 

1505 # Use reftable if extension is configured 

1506 if has_reftable_extension: 

1507 from .reftable import ReftableRefsContainer 

1508 

1509 self.refs = ReftableRefsContainer(self.commondir()) 

1510 # Update worktrees container after refs change 

1511 self.worktrees = WorkTreeContainer(self) 

1512 BaseRepo.__init__(self, object_store, self.refs) 

1513 

1514 # Determine hash algorithm from config if not already set 

1515 if self.object_format is None: 

1516 from .object_format import DEFAULT_OBJECT_FORMAT, get_object_format 

1517 

1518 if format_version == 1: 

1519 try: 

1520 object_format = config.get((b"extensions",), b"objectformat") 

1521 self.object_format = get_object_format( 

1522 object_format.decode("ascii") 

1523 ) 

1524 except KeyError: 

1525 self.object_format = DEFAULT_OBJECT_FORMAT 

1526 else: 

1527 self.object_format = DEFAULT_OBJECT_FORMAT 

1528 

1529 self._graftpoints = {} 

1530 graft_file = self.get_named_file( 

1531 os.path.join("info", "grafts"), basedir=self.commondir() 

1532 ) 

1533 if graft_file: 

1534 with graft_file: 

1535 self._graftpoints.update(parse_graftpoints(graft_file)) 

1536 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1537 if graft_file: 

1538 with graft_file: 

1539 self._graftpoints.update(parse_graftpoints(graft_file)) 

1540 

1541 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1542 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1543 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1544 self.hooks["pre-receive"] = PreReceiveShellHook(self.controldir()) 

1545 self.hooks["update"] = UpdateShellHook(self.controldir()) 

1546 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1547 

1548 # Initialize filter context as None, will be created lazily 

1549 self.filter_context = None 

1550 

1551 def get_worktree(self) -> "WorkTree": 

1552 """Get the working tree for this repository. 

1553 

1554 Returns: 

1555 WorkTree instance for performing working tree operations 

1556 """ 

1557 from .worktree import WorkTree 

1558 

1559 return WorkTree(self, self.path) 

1560 

1561 def _write_reflog( 

1562 self, 

1563 ref: bytes, 

1564 old_sha: bytes, 

1565 new_sha: bytes, 

1566 committer: bytes | None, 

1567 timestamp: int | None, 

1568 timezone: int | None, 

1569 message: bytes, 

1570 ) -> None: 

1571 from .reflog import format_reflog_line 

1572 

1573 path = self._reflog_path(ref) 

1574 

1575 # Get shared repository permissions 

1576 file_mode, dir_mode = self._get_shared_repository_permissions() 

1577 

1578 # Create directory with appropriate permissions 

1579 parent_dir = os.path.dirname(path) 

1580 # Create directory tree, setting permissions on each level if needed 

1581 parts = [] 

1582 current = parent_dir 

1583 while current and not os.path.exists(current): 

1584 parts.append(current) 

1585 current = os.path.dirname(current) 

1586 parts.reverse() 

1587 for part in parts: 

1588 os.mkdir(part) 

1589 if dir_mode is not None: 

1590 os.chmod(part, dir_mode) 

1591 if committer is None: 

1592 config = self.get_config_stack() 

1593 committer = get_user_identity(config) 

1594 check_user_identity(committer) 

1595 if timestamp is None: 

1596 timestamp = int(time.time()) 

1597 if timezone is None: 

1598 timezone = 0 # FIXME 

1599 with open(path, "ab") as f: 

1600 f.write( 

1601 format_reflog_line( 

1602 old_sha, new_sha, committer, timestamp, timezone, message 

1603 ) 

1604 + b"\n" 

1605 ) 

1606 

1607 # Set file permissions (open() respects umask, so we need chmod to set the actual mode) 

1608 # Always chmod to ensure correct permissions even if file already existed 

1609 if file_mode is not None: 

1610 os.chmod(path, file_mode) 

1611 

1612 def _reflog_path(self, ref: bytes) -> str: 

1613 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1614 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1615 

1616 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1617 return os.path.join(base, "logs", os.fsdecode(ref)) 

1618 

1619 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1620 """Read reflog entries for a reference. 

1621 

1622 Args: 

1623 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1624 

1625 Yields: 

1626 reflog.Entry objects in chronological order (oldest first) 

1627 """ 

1628 from .reflog import read_reflog 

1629 

1630 path = self._reflog_path(ref) 

1631 try: 

1632 with open(path, "rb") as f: 

1633 yield from read_reflog(f) 

1634 except FileNotFoundError: 

1635 return 

1636 

1637 @classmethod 

1638 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo": 

1639 """Iterate parent directories to discover a repository. 

1640 

1641 Return a Repo object for the first parent directory that looks like a 

1642 Git repository. 

1643 

1644 Args: 

1645 start: The directory to start discovery from (defaults to '.') 

1646 """ 

1647 path = os.path.abspath(start) 

1648 while True: 

1649 try: 

1650 return cls(path) 

1651 except NotGitRepository: 

1652 new_path, _tail = os.path.split(path) 

1653 if new_path == path: # Root reached 

1654 break 

1655 path = new_path 

1656 start_str = os.fspath(start) 

1657 if isinstance(start_str, bytes): 

1658 start_str = start_str.decode("utf-8") 

1659 raise NotGitRepository(f"No git repository was found at {start_str}") 

1660 

1661 def controldir(self) -> str: 

1662 """Return the path of the control directory.""" 

1663 return self._controldir 

1664 

1665 def commondir(self) -> str: 

1666 """Return the path of the common directory. 

1667 

1668 For a main working tree, it is identical to controldir(). 

1669 

1670 For a linked working tree, it is the control directory of the 

1671 main working tree. 

1672 """ 

1673 return self._commondir 

1674 

1675 def _determine_file_mode(self) -> bool: 

1676 """Probe the file-system to determine whether permissions can be trusted. 

1677 

1678 Returns: True if permissions can be trusted, False otherwise. 

1679 """ 

1680 fname = os.path.join(self.path, ".probe-permissions") 

1681 with open(fname, "w") as f: 

1682 f.write("") 

1683 

1684 st1 = os.lstat(fname) 

1685 try: 

1686 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1687 except PermissionError: 

1688 return False 

1689 st2 = os.lstat(fname) 

1690 

1691 os.unlink(fname) 

1692 

1693 mode_differs = st1.st_mode != st2.st_mode 

1694 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1695 

1696 return mode_differs and st2_has_exec 

1697 

1698 def _determine_symlinks(self) -> bool: 

1699 """Probe the filesystem to determine whether symlinks can be created. 

1700 

1701 Returns: True if symlinks can be created, False otherwise. 

1702 """ 

1703 # TODO(jelmer): Actually probe disk / look at filesystem 

1704 return sys.platform != "win32" 

1705 

1706 def _get_shared_repository_permissions( 

1707 self, 

1708 ) -> tuple[int | None, int | None]: 

1709 """Get shared repository file and directory permissions from config. 

1710 

1711 Returns: 

1712 tuple of (file_mask, directory_mask) or (None, None) if not shared 

1713 """ 

1714 try: 

1715 config = self.get_config() 

1716 value = config.get(("core",), "sharedRepository") 

1717 return parse_shared_repository(value) 

1718 except KeyError: 

1719 return (None, None) 

1720 

1721 def _put_named_file(self, path: str, contents: bytes) -> None: 

1722 """Write a file to the control dir with the given name and contents. 

1723 

1724 Args: 

1725 path: The path to the file, relative to the control dir. 

1726 contents: A string to write to the file. 

1727 """ 

1728 path = path.lstrip(os.path.sep) 

1729 

1730 # Get shared repository permissions 

1731 file_mode, _ = self._get_shared_repository_permissions() 

1732 

1733 # Create file with appropriate permissions 

1734 if file_mode is not None: 

1735 with GitFile( 

1736 os.path.join(self.controldir(), path), "wb", mask=file_mode 

1737 ) as f: 

1738 f.write(contents) 

1739 else: 

1740 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1741 f.write(contents) 

1742 

1743 def _del_named_file(self, path: str) -> None: 

1744 try: 

1745 os.unlink(os.path.join(self.controldir(), path)) 

1746 except FileNotFoundError: 

1747 return 

1748 

1749 def get_named_file( 

1750 self, 

1751 path: str | bytes, 

1752 basedir: str | None = None, 

1753 ) -> BinaryIO | None: 

1754 """Get a file from the control dir with a specific name. 

1755 

1756 Although the filename should be interpreted as a filename relative to 

1757 the control dir in a disk-based Repo, the object returned need not be 

1758 pointing to a file in that location. 

1759 

1760 Args: 

1761 path: The path to the file, relative to the control dir. 

1762 basedir: Optional argument that specifies an alternative to the 

1763 control dir. 

1764 Returns: An open file object, or None if the file does not exist. 

1765 """ 

1766 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1767 # the dumb web serving code. 

1768 if basedir is None: 

1769 basedir = self.controldir() 

1770 if isinstance(path, bytes): 

1771 path = path.decode("utf-8") 

1772 path = path.lstrip(os.path.sep) 

1773 try: 

1774 return open(os.path.join(basedir, path), "rb") 

1775 except FileNotFoundError: 

1776 return None 

1777 

1778 def index_path(self) -> str: 

1779 """Return path to the index file.""" 

1780 return os.path.join(self.controldir(), INDEX_FILENAME) 

1781 

1782 def open_index(self) -> "Index": 

1783 """Open the index for this repository. 

1784 

1785 Raises: 

1786 NoIndexPresent: If no index is present 

1787 Returns: The matching `Index` 

1788 """ 

1789 from .index import Index 

1790 

1791 if not self.has_index(): 

1792 raise NoIndexPresent 

1793 

1794 # Check for manyFiles feature configuration 

1795 config = self.get_config_stack() 

1796 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1797 skip_hash = False 

1798 index_version = None 

1799 

1800 if many_files: 

1801 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1802 try: 

1803 index_version_str = config.get(b"index", b"version") 

1804 index_version = int(index_version_str) 

1805 except KeyError: 

1806 index_version = 4 # Default to version 4 for manyFiles 

1807 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1808 else: 

1809 # Check for explicit index settings 

1810 try: 

1811 index_version_str = config.get(b"index", b"version") 

1812 index_version = int(index_version_str) 

1813 except KeyError: 

1814 index_version = None 

1815 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1816 

1817 # Get shared repository permissions for index file 

1818 file_mode, _ = self._get_shared_repository_permissions() 

1819 

1820 return Index( 

1821 self.index_path(), 

1822 skip_hash=skip_hash, 

1823 version=index_version, 

1824 file_mode=file_mode, 

1825 ) 

1826 

1827 def has_index(self) -> bool: 

1828 """Check if an index is present.""" 

1829 # Bare repos must never have index files; non-bare repos may have a 

1830 # missing index file, which is treated as empty. 

1831 return not self.bare 

1832 

1833 def clone( 

1834 self, 

1835 target_path: str | bytes | os.PathLike[str], 

1836 *, 

1837 mkdir: bool = True, 

1838 bare: bool = False, 

1839 origin: bytes = b"origin", 

1840 checkout: bool | None = None, 

1841 branch: bytes | None = None, 

1842 progress: Callable[[str], None] | None = None, 

1843 depth: int | None = None, 

1844 symlinks: bool | None = None, 

1845 ) -> "Repo": 

1846 """Clone this repository. 

1847 

1848 Args: 

1849 target_path: Target path 

1850 mkdir: Create the target directory 

1851 bare: Whether to create a bare repository 

1852 checkout: Whether or not to check-out HEAD after cloning 

1853 origin: Base name for refs in target repository 

1854 cloned from this repository 

1855 branch: Optional branch or tag to be used as HEAD in the new repository 

1856 instead of this repository's HEAD. 

1857 progress: Optional progress function 

1858 depth: Depth at which to fetch 

1859 symlinks: Symlinks setting (default to autodetect) 

1860 Returns: Created repository as `Repo` 

1861 """ 

1862 encoded_path = os.fsencode(self.path) 

1863 

1864 if mkdir: 

1865 os.mkdir(target_path) 

1866 

1867 try: 

1868 if not bare: 

1869 target = Repo.init(target_path, symlinks=symlinks) 

1870 if checkout is None: 

1871 checkout = True 

1872 else: 

1873 if checkout: 

1874 raise ValueError("checkout and bare are incompatible") 

1875 target = Repo.init_bare(target_path) 

1876 

1877 try: 

1878 target_config = target.get_config() 

1879 target_config.set((b"remote", origin), b"url", encoded_path) 

1880 target_config.set( 

1881 (b"remote", origin), 

1882 b"fetch", 

1883 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1884 ) 

1885 target_config.write_to_path() 

1886 

1887 ref_message = b"clone: from " + encoded_path 

1888 self.fetch(target, depth=depth) 

1889 target.refs.import_refs( 

1890 Ref(b"refs/remotes/" + origin), 

1891 self.refs.as_dict(Ref(b"refs/heads")), 

1892 message=ref_message, 

1893 ) 

1894 target.refs.import_refs( 

1895 Ref(b"refs/tags"), 

1896 self.refs.as_dict(Ref(b"refs/tags")), 

1897 message=ref_message, 

1898 ) 

1899 

1900 head_chain, origin_sha = self.refs.follow(HEADREF) 

1901 origin_head = head_chain[-1] if head_chain else None 

1902 if origin_sha and not origin_head: 

1903 # set detached HEAD 

1904 target.refs[HEADREF] = origin_sha 

1905 else: 

1906 _set_origin_head(target.refs, origin, origin_head) 

1907 head_ref = _set_default_branch( 

1908 target.refs, origin, origin_head, branch, ref_message 

1909 ) 

1910 

1911 # Update target head 

1912 if head_ref: 

1913 head = _set_head(target.refs, head_ref, ref_message) 

1914 else: 

1915 head = None 

1916 

1917 if checkout and head is not None: 

1918 target.get_worktree().reset_index() 

1919 except BaseException: 

1920 target.close() 

1921 raise 

1922 except BaseException: 

1923 if mkdir: 

1924 import shutil 

1925 

1926 shutil.rmtree(target_path) 

1927 raise 

1928 return target 

1929 

1930 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1931 """Get condition matchers for includeIf conditions. 

1932 

1933 Returns a dict of condition prefix to matcher function. 

1934 """ 

1935 from pathlib import Path 

1936 

1937 from .config import ConditionMatcher, match_glob_pattern 

1938 

1939 # Add gitdir matchers 

1940 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1941 """Match gitdir against a pattern. 

1942 

1943 Args: 

1944 pattern: Pattern to match against 

1945 case_sensitive: Whether to match case-sensitively 

1946 

1947 Returns: 

1948 True if gitdir matches pattern 

1949 """ 

1950 # Handle relative patterns (starting with ./) 

1951 if pattern.startswith("./"): 

1952 # Can't handle relative patterns without config directory context 

1953 return False 

1954 

1955 # Normalize repository path 

1956 try: 

1957 repo_path = str(Path(self._controldir).resolve()) 

1958 except (OSError, ValueError): 

1959 return False 

1960 

1961 # Expand ~ in pattern and normalize 

1962 pattern = os.path.expanduser(pattern) 

1963 

1964 # Normalize pattern following Git's rules 

1965 pattern = pattern.replace("\\", "/") 

1966 if not pattern.startswith(("~/", "./", "/", "**")): 

1967 # Check for Windows absolute path 

1968 if len(pattern) >= 2 and pattern[1] == ":": 

1969 pass 

1970 else: 

1971 pattern = "**/" + pattern 

1972 if pattern.endswith("/"): 

1973 pattern = pattern + "**" 

1974 

1975 # Use the existing _match_gitdir_pattern function 

1976 from .config import _match_gitdir_pattern 

1977 

1978 pattern_bytes = pattern.encode("utf-8", errors="replace") 

1979 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

1980 

1981 return _match_gitdir_pattern( 

1982 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

1983 ) 

1984 

1985 # Add onbranch matcher 

1986 def match_onbranch(pattern: str) -> bool: 

1987 """Match current branch against a pattern. 

1988 

1989 Args: 

1990 pattern: Pattern to match against 

1991 

1992 Returns: 

1993 True if current branch matches pattern 

1994 """ 

1995 try: 

1996 # Get the current branch using refs 

1997 ref_chain, _ = self.refs.follow(HEADREF) 

1998 head_ref = ref_chain[-1] # Get the final resolved ref 

1999 except KeyError: 

2000 pass 

2001 else: 

2002 if head_ref and head_ref.startswith(b"refs/heads/"): 

2003 # Extract branch name from ref 

2004 branch = extract_branch_name(head_ref).decode( 

2005 "utf-8", errors="replace" 

2006 ) 

2007 return match_glob_pattern(branch, pattern) 

2008 return False 

2009 

2010 matchers: dict[str, ConditionMatcher] = { 

2011 "onbranch:": match_onbranch, 

2012 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

2013 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

2014 } 

2015 

2016 return matchers 

2017 

2018 def get_worktree_config(self) -> "ConfigFile": 

2019 """Get the worktree-specific config. 

2020 

2021 Returns: 

2022 ConfigFile object for the worktree config 

2023 """ 

2024 from .config import ConfigFile 

2025 

2026 path = os.path.join(self.commondir(), "config.worktree") 

2027 try: 

2028 # Pass condition matchers for includeIf evaluation 

2029 condition_matchers = self._get_config_condition_matchers() 

2030 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

2031 except FileNotFoundError: 

2032 cf = ConfigFile() 

2033 cf.path = path 

2034 return cf 

2035 

2036 def get_config(self) -> "ConfigFile": 

2037 """Retrieve the config object. 

2038 

2039 Returns: `ConfigFile` object for the ``.git/config`` file. 

2040 """ 

2041 from .config import ConfigFile 

2042 

2043 path = os.path.join(self._commondir, "config") 

2044 try: 

2045 # Pass condition matchers for includeIf evaluation 

2046 condition_matchers = self._get_config_condition_matchers() 

2047 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

2048 except FileNotFoundError: 

2049 ret = ConfigFile() 

2050 ret.path = path 

2051 return ret 

2052 

2053 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2054 """Get the appropriate rebase state manager for this repository. 

2055 

2056 Returns: DiskRebaseStateManager instance 

2057 """ 

2058 import os 

2059 

2060 from .rebase import DiskRebaseStateManager 

2061 

2062 path = os.path.join(self.controldir(), "rebase-merge") 

2063 return DiskRebaseStateManager(path) 

2064 

2065 def get_description(self) -> bytes | None: 

2066 """Retrieve the description of this repository. 

2067 

2068 Returns: Description as bytes or None. 

2069 """ 

2070 path = os.path.join(self._controldir, "description") 

2071 try: 

2072 with GitFile(path, "rb") as f: 

2073 return f.read() 

2074 except FileNotFoundError: 

2075 return None 

2076 

2077 def __repr__(self) -> str: 

2078 """Return string representation of this repository.""" 

2079 return f"<Repo at {self.path!r}>" 

2080 

2081 def set_description(self, description: bytes) -> None: 

2082 """Set the description for this repository. 

2083 

2084 Args: 

2085 description: Text to set as description for this repository. 

2086 """ 

2087 self._put_named_file("description", description) 

2088 

2089 @classmethod 

2090 def _init_maybe_bare( 

2091 cls, 

2092 path: str | bytes | os.PathLike[str], 

2093 controldir: str | bytes | os.PathLike[str], 

2094 bare: bool, 

2095 object_store: PackBasedObjectStore | None = None, 

2096 config: "StackedConfig | None" = None, 

2097 default_branch: bytes | None = None, 

2098 symlinks: bool | None = None, 

2099 format: int | None = None, 

2100 shared_repository: str | bool | None = None, 

2101 object_format: str | None = None, 

2102 ) -> "Repo": 

2103 path = os.fspath(path) 

2104 if isinstance(path, bytes): 

2105 path = os.fsdecode(path) 

2106 controldir = os.fspath(controldir) 

2107 if isinstance(controldir, bytes): 

2108 controldir = os.fsdecode(controldir) 

2109 

2110 # Determine shared repository permissions early 

2111 file_mode: int | None = None 

2112 dir_mode: int | None = None 

2113 if shared_repository is not None: 

2114 file_mode, dir_mode = parse_shared_repository(shared_repository) 

2115 

2116 # Create base directories with appropriate permissions 

2117 for d in BASE_DIRECTORIES: 

2118 dir_path = os.path.join(controldir, *d) 

2119 os.mkdir(dir_path) 

2120 if dir_mode is not None: 

2121 os.chmod(dir_path, dir_mode) 

2122 

2123 # Determine hash algorithm 

2124 from .object_format import get_object_format 

2125 

2126 hash_alg = get_object_format(object_format) 

2127 

2128 if object_store is None: 

2129 object_store = DiskObjectStore.init( 

2130 os.path.join(controldir, OBJECTDIR), 

2131 file_mode=file_mode, 

2132 dir_mode=dir_mode, 

2133 object_format=hash_alg, 

2134 ) 

2135 ret = cls(path, bare=bare, object_store=object_store) 

2136 if default_branch is None: 

2137 if config is None: 

2138 from .config import StackedConfig 

2139 

2140 config = StackedConfig.default() 

2141 try: 

2142 default_branch = config.get("init", "defaultBranch") 

2143 except KeyError: 

2144 default_branch = DEFAULT_BRANCH 

2145 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch)) 

2146 ret._init_files( 

2147 bare=bare, 

2148 symlinks=symlinks, 

2149 format=format, 

2150 shared_repository=shared_repository, 

2151 object_format=object_format, 

2152 ) 

2153 return ret 

2154 

2155 @classmethod 

2156 def init( 

2157 cls, 

2158 path: str | bytes | os.PathLike[str], 

2159 *, 

2160 mkdir: bool = False, 

2161 config: "StackedConfig | None" = None, 

2162 default_branch: bytes | None = None, 

2163 symlinks: bool | None = None, 

2164 format: int | None = None, 

2165 shared_repository: str | bool | None = None, 

2166 object_format: str | None = None, 

2167 ) -> "Repo": 

2168 """Create a new repository. 

2169 

2170 Args: 

2171 path: Path in which to create the repository 

2172 mkdir: Whether to create the directory 

2173 config: Configuration object 

2174 default_branch: Default branch name 

2175 symlinks: Whether to support symlinks 

2176 format: Repository format version (defaults to 0) 

2177 shared_repository: Shared repository setting (group, all, umask, or octal) 

2178 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1") 

2179 Returns: `Repo` instance 

2180 """ 

2181 path = os.fspath(path) 

2182 if isinstance(path, bytes): 

2183 path = os.fsdecode(path) 

2184 if mkdir: 

2185 os.mkdir(path) 

2186 controldir = os.path.join(path, CONTROLDIR) 

2187 os.mkdir(controldir) 

2188 _set_filesystem_hidden(controldir) 

2189 return cls._init_maybe_bare( 

2190 path, 

2191 controldir, 

2192 False, 

2193 config=config, 

2194 default_branch=default_branch, 

2195 symlinks=symlinks, 

2196 format=format, 

2197 shared_repository=shared_repository, 

2198 object_format=object_format, 

2199 ) 

2200 

2201 @classmethod 

2202 def _init_new_working_directory( 

2203 cls, 

2204 path: str | bytes | os.PathLike[str], 

2205 main_repo: "Repo", 

2206 identifier: str | None = None, 

2207 mkdir: bool = False, 

2208 ) -> "Repo": 

2209 """Create a new working directory linked to a repository. 

2210 

2211 Args: 

2212 path: Path in which to create the working tree. 

2213 main_repo: Main repository to reference 

2214 identifier: Worktree identifier 

2215 mkdir: Whether to create the directory 

2216 Returns: `Repo` instance 

2217 """ 

2218 path = os.fspath(path) 

2219 if isinstance(path, bytes): 

2220 path = os.fsdecode(path) 

2221 if mkdir: 

2222 os.mkdir(path) 

2223 if identifier is None: 

2224 identifier = os.path.basename(path) 

2225 # Ensure we use absolute path for the worktree control directory 

2226 main_controldir = os.path.abspath(main_repo.controldir()) 

2227 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

2228 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

2229 gitdirfile = os.path.join(path, CONTROLDIR) 

2230 with open(gitdirfile, "wb") as f: 

2231 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

2232 

2233 # Get shared repository permissions from main repository 

2234 _, dir_mode = main_repo._get_shared_repository_permissions() 

2235 

2236 # Create directories with appropriate permissions 

2237 try: 

2238 os.mkdir(main_worktreesdir) 

2239 if dir_mode is not None: 

2240 os.chmod(main_worktreesdir, dir_mode) 

2241 except FileExistsError: 

2242 pass 

2243 try: 

2244 os.mkdir(worktree_controldir) 

2245 if dir_mode is not None: 

2246 os.chmod(worktree_controldir, dir_mode) 

2247 except FileExistsError: 

2248 pass 

2249 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

2250 f.write(os.fsencode(gitdirfile) + b"\n") 

2251 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

2252 f.write(b"../..\n") 

2253 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

2254 f.write(main_repo.head() + b"\n") 

2255 r = cls(os.path.normpath(path)) 

2256 r.get_worktree().reset_index() 

2257 return r 

2258 

2259 @classmethod 

2260 def init_bare( 

2261 cls, 

2262 path: str | bytes | os.PathLike[str], 

2263 *, 

2264 mkdir: bool = False, 

2265 object_store: PackBasedObjectStore | None = None, 

2266 config: "StackedConfig | None" = None, 

2267 default_branch: bytes | None = None, 

2268 format: int | None = None, 

2269 shared_repository: str | bool | None = None, 

2270 object_format: str | None = None, 

2271 ) -> "Repo": 

2272 """Create a new bare repository. 

2273 

2274 ``path`` should already exist and be an empty directory. 

2275 

2276 Args: 

2277 path: Path to create bare repository in 

2278 mkdir: Whether to create the directory 

2279 object_store: Object store to use 

2280 config: Configuration object 

2281 default_branch: Default branch name 

2282 format: Repository format version (defaults to 0) 

2283 shared_repository: Shared repository setting (group, all, umask, or octal) 

2284 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1") 

2285 Returns: a `Repo` instance 

2286 """ 

2287 path = os.fspath(path) 

2288 if isinstance(path, bytes): 

2289 path = os.fsdecode(path) 

2290 if mkdir: 

2291 os.mkdir(path) 

2292 return cls._init_maybe_bare( 

2293 path, 

2294 path, 

2295 True, 

2296 object_store=object_store, 

2297 config=config, 

2298 default_branch=default_branch, 

2299 format=format, 

2300 shared_repository=shared_repository, 

2301 object_format=object_format, 

2302 ) 

2303 

2304 create = init_bare 

2305 

2306 def close(self) -> None: 

2307 """Close any files opened by this repository.""" 

2308 self.object_store.close() 

2309 # Clean up filter context if it was created 

2310 if self.filter_context is not None: 

2311 self.filter_context.close() 

2312 self.filter_context = None 

2313 

2314 def __enter__(self) -> "Repo": 

2315 """Enter context manager.""" 

2316 return self 

2317 

2318 def __exit__( 

2319 self, 

2320 exc_type: type[BaseException] | None, 

2321 exc_val: BaseException | None, 

2322 exc_tb: TracebackType | None, 

2323 ) -> None: 

2324 """Exit context manager and close repository.""" 

2325 self.close() 

2326 

2327 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2328 """Read .gitattributes file from working tree. 

2329 

2330 Returns: 

2331 Dictionary mapping file patterns to attributes 

2332 """ 

2333 gitattributes = {} 

2334 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2335 

2336 if os.path.exists(gitattributes_path): 

2337 with open(gitattributes_path, "rb") as f: 

2338 for line in f: 

2339 line = line.strip() 

2340 if not line or line.startswith(b"#"): 

2341 continue 

2342 

2343 parts = line.split() 

2344 if len(parts) < 2: 

2345 continue 

2346 

2347 pattern = parts[0] 

2348 attrs = {} 

2349 

2350 for attr in parts[1:]: 

2351 if attr.startswith(b"-"): 

2352 # Unset attribute 

2353 attrs[attr[1:]] = b"false" 

2354 elif b"=" in attr: 

2355 # Set to value 

2356 key, value = attr.split(b"=", 1) 

2357 attrs[key] = value 

2358 else: 

2359 # Set attribute 

2360 attrs[attr] = b"true" 

2361 

2362 gitattributes[pattern] = attrs 

2363 

2364 return gitattributes 

2365 

2366 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2367 """Return a BlobNormalizer object.""" 

2368 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2369 

2370 # Get fresh configuration and GitAttributes 

2371 config_stack = self.get_config_stack() 

2372 git_attributes = self.get_gitattributes() 

2373 

2374 # Lazily create FilterContext if needed 

2375 if self.filter_context is None: 

2376 filter_registry = FilterRegistry(config_stack, self) 

2377 self.filter_context = FilterContext(filter_registry) 

2378 else: 

2379 # Refresh the context with current config to handle config changes 

2380 self.filter_context.refresh_config(config_stack) 

2381 

2382 # Return a new FilterBlobNormalizer with the context 

2383 return FilterBlobNormalizer( 

2384 config_stack, git_attributes, filter_context=self.filter_context 

2385 ) 

2386 

2387 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2388 """Read gitattributes for the repository. 

2389 

2390 Args: 

2391 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2392 

2393 Returns: 

2394 GitAttributes object that can be used to match paths 

2395 """ 

2396 from .attrs import ( 

2397 GitAttributes, 

2398 Pattern, 

2399 parse_git_attributes, 

2400 ) 

2401 

2402 patterns = [] 

2403 

2404 # Read system gitattributes (TODO: implement this) 

2405 # Read global gitattributes (TODO: implement this) 

2406 

2407 # Read repository .gitattributes from index/tree 

2408 if tree is None: 

2409 try: 

2410 # Try to get from HEAD 

2411 head = self[b"HEAD"] 

2412 # Peel tags to get to the underlying commit 

2413 while isinstance(head, Tag): 

2414 _cls, obj = head.object 

2415 head = self.get_object(obj) 

2416 if not isinstance(head, Commit): 

2417 raise ValueError( 

2418 f"Expected HEAD to point to a Commit, got {type(head).__name__}. " 

2419 f"This usually means HEAD points to a {type(head).__name__} object " 

2420 f"instead of a Commit." 

2421 ) 

2422 tree = head.tree 

2423 except KeyError: 

2424 # No HEAD, no attributes from tree 

2425 pass 

2426 

2427 if tree is not None: 

2428 try: 

2429 tree_obj = self[tree] 

2430 assert isinstance(tree_obj, Tree) 

2431 if b".gitattributes" in tree_obj: 

2432 _, attrs_sha = tree_obj[b".gitattributes"] 

2433 attrs_blob = self[attrs_sha] 

2434 if isinstance(attrs_blob, Blob): 

2435 attrs_data = BytesIO(attrs_blob.data) 

2436 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2437 pattern = Pattern(pattern_bytes) 

2438 patterns.append((pattern, attrs)) 

2439 except (KeyError, NotTreeError): 

2440 pass 

2441 

2442 # Read .git/info/attributes 

2443 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2444 if os.path.exists(info_attrs_path): 

2445 with open(info_attrs_path, "rb") as f: 

2446 for pattern_bytes, attrs in parse_git_attributes(f): 

2447 pattern = Pattern(pattern_bytes) 

2448 patterns.append((pattern, attrs)) 

2449 

2450 # Read .gitattributes from working directory (if it exists) 

2451 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2452 if os.path.exists(working_attrs_path): 

2453 with open(working_attrs_path, "rb") as f: 

2454 for pattern_bytes, attrs in parse_git_attributes(f): 

2455 pattern = Pattern(pattern_bytes) 

2456 patterns.append((pattern, attrs)) 

2457 

2458 return GitAttributes(patterns) 

2459 

2460 

2461class MemoryRepo(BaseRepo): 

2462 """Repo that stores refs, objects, and named files in memory. 

2463 

2464 MemoryRepos are always bare: they have no working tree and no index, since 

2465 those have a stronger dependency on the filesystem. 

2466 """ 

2467 

2468 filter_context: "FilterContext | None" 

2469 

2470 def __init__(self) -> None: 

2471 """Create a new repository in memory.""" 

2472 from .config import ConfigFile 

2473 from .object_format import DEFAULT_OBJECT_FORMAT 

2474 

2475 self._reflog: list[Any] = [] 

2476 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2477 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2478 self._named_files: dict[str, bytes] = {} 

2479 self.bare = True 

2480 self._config = ConfigFile() 

2481 self._description: bytes | None = None 

2482 self.filter_context = None 

2483 # MemoryRepo defaults to default object format 

2484 self.object_format = DEFAULT_OBJECT_FORMAT 

2485 

2486 def _append_reflog( 

2487 self, 

2488 ref: bytes, 

2489 old_sha: bytes | None, 

2490 new_sha: bytes | None, 

2491 committer: bytes | None, 

2492 timestamp: int | None, 

2493 timezone: int | None, 

2494 message: bytes | None, 

2495 ) -> None: 

2496 self._reflog.append( 

2497 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2498 ) 

2499 

2500 def set_description(self, description: bytes) -> None: 

2501 """Set the description for this repository. 

2502 

2503 Args: 

2504 description: Text to set as description 

2505 """ 

2506 self._description = description 

2507 

2508 def get_description(self) -> bytes | None: 

2509 """Get the description of this repository. 

2510 

2511 Returns: 

2512 Repository description as bytes 

2513 """ 

2514 return self._description 

2515 

2516 def _determine_file_mode(self) -> bool: 

2517 """Probe the file-system to determine whether permissions can be trusted. 

2518 

2519 Returns: True if permissions can be trusted, False otherwise. 

2520 """ 

2521 return sys.platform != "win32" 

2522 

2523 def _determine_symlinks(self) -> bool: 

2524 """Probe the file-system to determine whether permissions can be trusted. 

2525 

2526 Returns: True if permissions can be trusted, False otherwise. 

2527 """ 

2528 return sys.platform != "win32" 

2529 

2530 def _put_named_file(self, path: str, contents: bytes) -> None: 

2531 """Write a file to the control dir with the given name and contents. 

2532 

2533 Args: 

2534 path: The path to the file, relative to the control dir. 

2535 contents: A string to write to the file. 

2536 """ 

2537 self._named_files[path] = contents 

2538 

2539 def _del_named_file(self, path: str) -> None: 

2540 try: 

2541 del self._named_files[path] 

2542 except KeyError: 

2543 pass 

2544 

2545 def get_named_file( 

2546 self, 

2547 path: str | bytes, 

2548 basedir: str | None = None, 

2549 ) -> BytesIO | None: 

2550 """Get a file from the control dir with a specific name. 

2551 

2552 Although the filename should be interpreted as a filename relative to 

2553 the control dir in a disk-baked Repo, the object returned need not be 

2554 pointing to a file in that location. 

2555 

2556 Args: 

2557 path: The path to the file, relative to the control dir. 

2558 basedir: Optional base directory for the path 

2559 Returns: An open file object, or None if the file does not exist. 

2560 """ 

2561 path_str = path.decode() if isinstance(path, bytes) else path 

2562 contents = self._named_files.get(path_str, None) 

2563 if contents is None: 

2564 return None 

2565 return BytesIO(contents) 

2566 

2567 def open_index(self) -> "Index": 

2568 """Fail to open index for this repo, since it is bare. 

2569 

2570 Raises: 

2571 NoIndexPresent: Raised when no index is present 

2572 """ 

2573 raise NoIndexPresent 

2574 

2575 def _init_config(self, config: "ConfigFile") -> None: 

2576 """Initialize repository configuration for MemoryRepo.""" 

2577 self._config = config 

2578 

2579 def get_config(self) -> "ConfigFile": 

2580 """Retrieve the config object. 

2581 

2582 Returns: `ConfigFile` object. 

2583 """ 

2584 return self._config 

2585 

2586 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2587 """Get the appropriate rebase state manager for this repository. 

2588 

2589 Returns: MemoryRebaseStateManager instance 

2590 """ 

2591 from .rebase import MemoryRebaseStateManager 

2592 

2593 return MemoryRebaseStateManager(self) 

2594 

2595 def get_blob_normalizer(self) -> "FilterBlobNormalizer": 

2596 """Return a BlobNormalizer object for checkin/checkout operations.""" 

2597 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2598 

2599 # Get fresh configuration and GitAttributes 

2600 config_stack = self.get_config_stack() 

2601 git_attributes = self.get_gitattributes() 

2602 

2603 # Lazily create FilterContext if needed 

2604 if self.filter_context is None: 

2605 filter_registry = FilterRegistry(config_stack, self) 

2606 self.filter_context = FilterContext(filter_registry) 

2607 else: 

2608 # Refresh the context with current config to handle config changes 

2609 self.filter_context.refresh_config(config_stack) 

2610 

2611 # Return a new FilterBlobNormalizer with the context 

2612 return FilterBlobNormalizer( 

2613 config_stack, git_attributes, filter_context=self.filter_context 

2614 ) 

2615 

2616 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2617 """Read gitattributes for the repository.""" 

2618 from .attrs import GitAttributes 

2619 

2620 # Memory repos don't have working trees or gitattributes files 

2621 # Return empty GitAttributes 

2622 return GitAttributes([]) 

2623 

2624 def close(self) -> None: 

2625 """Close any resources opened by this repository.""" 

2626 # Clean up filter context if it was created 

2627 if self.filter_context is not None: 

2628 self.filter_context.close() 

2629 self.filter_context = None 

2630 # Close object store to release pack files 

2631 self.object_store.close() 

2632 

2633 def do_commit( 

2634 self, 

2635 message: bytes | None = None, 

2636 committer: bytes | None = None, 

2637 author: bytes | None = None, 

2638 commit_timestamp: float | None = None, 

2639 commit_timezone: int | None = None, 

2640 author_timestamp: float | None = None, 

2641 author_timezone: int | None = None, 

2642 tree: ObjectID | None = None, 

2643 encoding: bytes | None = None, 

2644 ref: Ref | None = HEADREF, 

2645 merge_heads: list[ObjectID] | None = None, 

2646 no_verify: bool = False, 

2647 sign: bool = False, 

2648 ) -> bytes: 

2649 """Create a new commit. 

2650 

2651 This is a simplified implementation for in-memory repositories that 

2652 doesn't support worktree operations or hooks. 

2653 

2654 Args: 

2655 message: Commit message 

2656 committer: Committer fullname 

2657 author: Author fullname 

2658 commit_timestamp: Commit timestamp (defaults to now) 

2659 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2660 author_timestamp: Author timestamp (defaults to commit timestamp) 

2661 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2662 tree: SHA1 of the tree root to use 

2663 encoding: Encoding 

2664 ref: Optional ref to commit to (defaults to current branch). 

2665 If None, creates a dangling commit without updating any ref. 

2666 merge_heads: Merge heads 

2667 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2668 sign: GPG Sign the commit (ignored for MemoryRepo) 

2669 

2670 Returns: 

2671 New commit SHA1 

2672 """ 

2673 import time 

2674 

2675 from .objects import Commit 

2676 

2677 if tree is None: 

2678 raise ValueError("tree must be specified for MemoryRepo") 

2679 

2680 c = Commit() 

2681 if len(tree) != self.object_format.hex_length: 

2682 raise ValueError( 

2683 f"tree must be a {self.object_format.hex_length}-character hex sha string" 

2684 ) 

2685 c.tree = tree 

2686 

2687 config = self.get_config_stack() 

2688 if merge_heads is None: 

2689 merge_heads = [] 

2690 if committer is None: 

2691 committer = get_user_identity(config, kind="COMMITTER") 

2692 check_user_identity(committer) 

2693 c.committer = committer 

2694 if commit_timestamp is None: 

2695 commit_timestamp = time.time() 

2696 c.commit_time = int(commit_timestamp) 

2697 if commit_timezone is None: 

2698 commit_timezone = 0 

2699 c.commit_timezone = commit_timezone 

2700 if author is None: 

2701 author = get_user_identity(config, kind="AUTHOR") 

2702 c.author = author 

2703 check_user_identity(author) 

2704 if author_timestamp is None: 

2705 author_timestamp = commit_timestamp 

2706 c.author_time = int(author_timestamp) 

2707 if author_timezone is None: 

2708 author_timezone = commit_timezone 

2709 c.author_timezone = author_timezone 

2710 if encoding is None: 

2711 try: 

2712 encoding = config.get(("i18n",), "commitEncoding") 

2713 except KeyError: 

2714 pass 

2715 if encoding is not None: 

2716 c.encoding = encoding 

2717 

2718 # Handle message (for MemoryRepo, we don't support callable messages) 

2719 if callable(message): 

2720 message = message(self, c) 

2721 if message is None: 

2722 raise ValueError("Message callback returned None") 

2723 

2724 if message is None: 

2725 raise ValueError("No commit message specified") 

2726 

2727 c.message = message 

2728 

2729 if ref is None: 

2730 # Create a dangling commit 

2731 c.parents = merge_heads 

2732 self.object_store.add_object(c) 

2733 else: 

2734 try: 

2735 old_head = self.refs[ref] 

2736 c.parents = [old_head, *merge_heads] 

2737 self.object_store.add_object(c) 

2738 ok = self.refs.set_if_equals( 

2739 ref, 

2740 old_head, 

2741 c.id, 

2742 message=b"commit: " + message, 

2743 committer=committer, 

2744 timestamp=int(commit_timestamp), 

2745 timezone=commit_timezone, 

2746 ) 

2747 except KeyError: 

2748 c.parents = merge_heads 

2749 self.object_store.add_object(c) 

2750 ok = self.refs.add_if_new( 

2751 ref, 

2752 c.id, 

2753 message=b"commit: " + message, 

2754 committer=committer, 

2755 timestamp=int(commit_timestamp), 

2756 timezone=commit_timezone, 

2757 ) 

2758 if not ok: 

2759 from .errors import CommitError 

2760 

2761 raise CommitError(f"{ref!r} changed during commit") 

2762 

2763 return c.id 

2764 

2765 @classmethod 

2766 def init_bare( 

2767 cls, 

2768 objects: Iterable[ShaFile], 

2769 refs: Mapping[Ref, ObjectID], 

2770 format: int | None = None, 

2771 object_format: str | None = None, 

2772 ) -> "MemoryRepo": 

2773 """Create a new bare repository in memory. 

2774 

2775 Args: 

2776 objects: Objects for the new repository, 

2777 as iterable 

2778 refs: Refs as dictionary, mapping names 

2779 to object SHA1s 

2780 format: Repository format version (defaults to 0) 

2781 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1") 

2782 """ 

2783 ret = cls() 

2784 for obj in objects: 

2785 ret.object_store.add_object(obj) 

2786 for refname, sha in refs.items(): 

2787 ret.refs.add_if_new(refname, sha) 

2788 ret._init_files(bare=True, format=format, object_format=object_format) 

2789 return ret