Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1134 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23 

24"""Repository access. 

25 

26This module contains the base class for git repositories 

27(BaseRepo) and an implementation which uses a repository on 

28local disk (Repo). 

29 

30""" 

31 

32__all__ = [ 

33 "BASE_DIRECTORIES", 

34 "COMMONDIR", 

35 "CONTROLDIR", 

36 "DEFAULT_BRANCH", 

37 "DEFAULT_OFS_DELTA", 

38 "GITDIR", 

39 "INDEX_FILENAME", 

40 "OBJECTDIR", 

41 "REFSDIR", 

42 "REFSDIR_HEADS", 

43 "REFSDIR_TAGS", 

44 "WORKTREES", 

45 "BaseRepo", 

46 "DefaultIdentityNotFound", 

47 "InvalidUserIdentity", 

48 "MemoryRepo", 

49 "ParentsProvider", 

50 "Repo", 

51 "UnsupportedExtension", 

52 "UnsupportedVersion", 

53 "check_user_identity", 

54 "get_user_identity", 

55 "parse_graftpoints", 

56 "parse_shared_repository", 

57 "read_gitfile", 

58 "serialize_graftpoints", 

59] 

60 

61import os 

62import stat 

63import sys 

64import time 

65import warnings 

66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence 

67from io import BytesIO 

68from types import TracebackType 

69from typing import ( 

70 TYPE_CHECKING, 

71 Any, 

72 BinaryIO, 

73 TypeVar, 

74) 

75 

76if sys.version_info >= (3, 11): 

77 from typing import Self 

78else: 

79 from typing_extensions import Self 

80 

81if TYPE_CHECKING: 

82 # There are no circular imports here, but we try to defer imports as long 

83 # as possible to reduce start-up time for anything that doesn't need 

84 # these imports. 

85 from .attrs import GitAttributes 

86 from .config import ConditionMatcher, Config, ConfigFile, StackedConfig 

87 from .diff_tree import RenameDetector 

88 from .filters import FilterBlobNormalizer, FilterContext 

89 from .index import Index 

90 from .notes import Notes 

91 from .object_format import ObjectFormat 

92 from .object_store import BaseObjectStore, GraphWalker 

93 from .pack import UnpackedObject 

94 from .rebase import RebaseStateManager 

95 from .walk import Walker 

96 from .worktree import WorkTree 

97 

98from . import reflog 

99from .errors import ( 

100 NoIndexPresent, 

101 NotBlobError, 

102 NotCommitError, 

103 NotGitRepository, 

104 NotTagError, 

105 NotTreeError, 

106 RefFormatError, 

107) 

108from .file import GitFile 

109from .hooks import ( 

110 CommitMsgShellHook, 

111 Hook, 

112 PostCommitShellHook, 

113 PostReceiveShellHook, 

114 PreCommitShellHook, 

115 PreReceiveShellHook, 

116 UpdateShellHook, 

117) 

118from .object_store import ( 

119 DiskObjectStore, 

120 MemoryObjectStore, 

121 MissingObjectFinder, 

122 ObjectStoreGraphWalker, 

123 PackBasedObjectStore, 

124 PackCapableObjectStore, 

125 find_shallow, 

126 peel_sha, 

127) 

128from .objects import ( 

129 Blob, 

130 Commit, 

131 ObjectID, 

132 RawObjectID, 

133 ShaFile, 

134 Tag, 

135 Tree, 

136 check_hexsha, 

137 valid_hexsha, 

138) 

139from .pack import generate_unpacked_objects 

140from .refs import ( 

141 HEADREF, 

142 LOCAL_TAG_PREFIX, # noqa: F401 

143 SYMREF, # noqa: F401 

144 DictRefsContainer, 

145 DiskRefsContainer, 

146 Ref, 

147 RefsContainer, 

148 _set_default_branch, 

149 _set_head, 

150 _set_origin_head, 

151 check_ref_format, # noqa: F401 

152 extract_branch_name, 

153 is_per_worktree_ref, 

154 local_branch_name, 

155 read_packed_refs, # noqa: F401 

156 read_packed_refs_with_peeled, # noqa: F401 

157 write_packed_refs, # noqa: F401 

158) 

159 

160CONTROLDIR = ".git" 

161OBJECTDIR = "objects" 

162DEFAULT_OFS_DELTA = True 

163 

164T = TypeVar("T", bound="ShaFile") 

165REFSDIR = "refs" 

166REFSDIR_TAGS = "tags" 

167REFSDIR_HEADS = "heads" 

168INDEX_FILENAME = "index" 

169COMMONDIR = "commondir" 

170GITDIR = "gitdir" 

171WORKTREES = "worktrees" 

172 

173BASE_DIRECTORIES = [ 

174 ["branches"], 

175 [REFSDIR], 

176 [REFSDIR, REFSDIR_TAGS], 

177 [REFSDIR, REFSDIR_HEADS], 

178 ["hooks"], 

179 ["info"], 

180] 

181 

182DEFAULT_BRANCH = b"master" 

183 

184 

185class InvalidUserIdentity(Exception): 

186 """User identity is not of the format 'user <email>'.""" 

187 

188 def __init__(self, identity: str) -> None: 

189 """Initialize InvalidUserIdentity exception.""" 

190 self.identity = identity 

191 

192 

193class DefaultIdentityNotFound(Exception): 

194 """Default identity could not be determined.""" 

195 

196 

197# TODO(jelmer): Cache? 

198def _get_default_identity() -> tuple[str, str]: 

199 import socket 

200 

201 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

202 username = os.environ.get(name) 

203 if username: 

204 break 

205 else: 

206 username = None 

207 

208 try: 

209 import pwd 

210 except ImportError: 

211 fullname = None 

212 else: 

213 try: 

214 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore] 

215 except KeyError: 

216 fullname = None 

217 else: 

218 if getattr(entry, "gecos", None): 

219 fullname = entry.pw_gecos.split(",")[0] 

220 else: 

221 fullname = None 

222 if username is None: 

223 username = entry.pw_name 

224 if not fullname: 

225 if username is None: 

226 raise DefaultIdentityNotFound("no username found") 

227 fullname = username 

228 email = os.environ.get("EMAIL") 

229 if email is None: 

230 if username is None: 

231 raise DefaultIdentityNotFound("no username found") 

232 email = f"{username}@{socket.gethostname()}" 

233 return (fullname, email) 

234 

235 

236def get_user_identity(config: "Config", kind: str | None = None) -> bytes: 

237 """Determine the identity to use for new commits. 

238 

239 If kind is set, this first checks 

240 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

241 

242 If those variables are not set, then it will fall back 

243 to reading the user.name and user.email settings from 

244 the specified configuration. 

245 

246 If that also fails, then it will fall back to using 

247 the current users' identity as obtained from the host 

248 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

249 

250 Args: 

251 config: Configuration stack to read from 

252 kind: Optional kind to return identity for, 

253 usually either "AUTHOR" or "COMMITTER". 

254 

255 Returns: 

256 A user identity 

257 """ 

258 user: bytes | None = None 

259 email: bytes | None = None 

260 if kind: 

261 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

262 if user_uc is not None: 

263 user = user_uc.encode("utf-8") 

264 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

265 if email_uc is not None: 

266 email = email_uc.encode("utf-8") 

267 if user is None: 

268 try: 

269 user = config.get(("user",), "name") 

270 except KeyError: 

271 user = None 

272 if email is None: 

273 try: 

274 email = config.get(("user",), "email") 

275 except KeyError: 

276 email = None 

277 default_user, default_email = _get_default_identity() 

278 if user is None: 

279 user = default_user.encode("utf-8") 

280 if email is None: 

281 email = default_email.encode("utf-8") 

282 if email.startswith(b"<") and email.endswith(b">"): 

283 email = email[1:-1] 

284 return user + b" <" + email + b">" 

285 

286 

287def check_user_identity(identity: bytes) -> None: 

288 """Verify that a user identity is formatted correctly. 

289 

290 Args: 

291 identity: User identity bytestring 

292 Raises: 

293 InvalidUserIdentity: Raised when identity is invalid 

294 """ 

295 try: 

296 _fst, snd = identity.split(b" <", 1) 

297 except ValueError as exc: 

298 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc 

299 if b">" not in snd: 

300 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

301 if b"\0" in identity or b"\n" in identity: 

302 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) 

303 

304 

305def parse_graftpoints( 

306 graftpoints: Iterable[bytes], 

307) -> dict[ObjectID, list[ObjectID]]: 

308 """Convert a list of graftpoints into a dict. 

309 

310 Args: 

311 graftpoints: Iterator of graftpoint lines 

312 

313 Each line is formatted as: 

314 <commit sha1> <parent sha1> [<parent sha1>]* 

315 

316 Resulting dictionary is: 

317 <commit sha1>: [<parent sha1>*] 

318 

319 https://git.wiki.kernel.org/index.php/GraftPoint 

320 """ 

321 grafts: dict[ObjectID, list[ObjectID]] = {} 

322 for line in graftpoints: 

323 raw_graft = line.split(None, 1) 

324 

325 commit = ObjectID(raw_graft[0]) 

326 if len(raw_graft) == 2: 

327 parents = [ObjectID(p) for p in raw_graft[1].split()] 

328 else: 

329 parents = [] 

330 

331 for sha in [commit, *parents]: 

332 check_hexsha(sha, "Invalid graftpoint") 

333 

334 grafts[commit] = parents 

335 return grafts 

336 

337 

338def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes: 

339 """Convert a dictionary of grafts into string. 

340 

341 The graft dictionary is: 

342 <commit sha1>: [<parent sha1>*] 

343 

344 Each line is formatted as: 

345 <commit sha1> <parent sha1> [<parent sha1>]* 

346 

347 https://git.wiki.kernel.org/index.php/GraftPoint 

348 

349 """ 

350 graft_lines = [] 

351 for commit, parents in graftpoints.items(): 

352 if parents: 

353 graft_lines.append(commit + b" " + b" ".join(parents)) 

354 else: 

355 graft_lines.append(commit) 

356 return b"\n".join(graft_lines) 

357 

358 

359def _set_filesystem_hidden(path: str) -> None: 

360 """Mark path as to be hidden if supported by platform and filesystem. 

361 

362 On win32 uses SetFileAttributesW api: 

363 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

364 """ 

365 if sys.platform == "win32": 

366 import ctypes 

367 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

368 

369 FILE_ATTRIBUTE_HIDDEN = 2 

370 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

371 ("SetFileAttributesW", ctypes.windll.kernel32) 

372 ) 

373 

374 if isinstance(path, bytes): 

375 path = os.fsdecode(path) 

376 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

377 pass # Could raise or log `ctypes.WinError()` here 

378 

379 # Could implement other platform specific filesystem hiding here 

380 

381 

382def parse_shared_repository( 

383 value: str | bytes | bool, 

384) -> tuple[int | None, int | None]: 

385 """Parse core.sharedRepository configuration value. 

386 

387 Args: 

388 value: Configuration value (string, bytes, or boolean) 

389 

390 Returns: 

391 tuple of (file_mask, directory_mask) or (None, None) if not shared 

392 

393 The masks are permission bits to apply via chmod. 

394 """ 

395 if isinstance(value, bytes): 

396 value = value.decode("utf-8", errors="replace") 

397 

398 # Handle boolean values 

399 if isinstance(value, bool): 

400 if value: 

401 # true = group (same as "group") 

402 return (0o664, 0o2775) 

403 else: 

404 # false = umask (use system umask, no adjustment) 

405 return (None, None) 

406 

407 # Handle string values 

408 value_lower = value.lower() 

409 

410 if value_lower in ("false", "0", ""): 

411 # Use umask (no adjustment) 

412 return (None, None) 

413 

414 if value_lower in ("true", "1", "group"): 

415 # Group writable (with setgid bit) 

416 return (0o664, 0o2775) 

417 

418 if value_lower in ("all", "world", "everybody", "2"): 

419 # World readable/writable (with setgid bit) 

420 return (0o666, 0o2777) 

421 

422 if value_lower == "umask": 

423 # Explicitly use umask 

424 return (None, None) 

425 

426 # Try to parse as octal 

427 if value.startswith("0"): 

428 try: 

429 mode = int(value, 8) 

430 # For directories, add execute bits where read bits are set 

431 # and add setgid bit for shared repositories 

432 dir_mode = mode | 0o2000 # Add setgid bit 

433 if mode & 0o004: 

434 dir_mode |= 0o001 

435 if mode & 0o040: 

436 dir_mode |= 0o010 

437 if mode & 0o400: 

438 dir_mode |= 0o100 

439 return (mode, dir_mode) 

440 except ValueError: 

441 pass 

442 

443 # Default to umask for unrecognized values 

444 return (None, None) 

445 

446 

447def _enable_relative_worktrees_extension(repo: "Repo") -> None: 

448 """Enable the relativeworktrees extension in repository config. 

449 

450 This sets core.repositoryformatversion to 1 (if not already) and 

451 enables the extensions.relativeworktrees extension. 

452 

453 Args: 

454 repo: The repository to configure 

455 """ 

456 config = repo.get_config() 

457 

458 # Ensure repository format version is at least 1 

459 try: 

460 version = int(config.get(("core",), "repositoryformatversion")) 

461 except KeyError: 

462 version = 0 

463 

464 if version < 1: 

465 config.set(("core",), "repositoryformatversion", "1") 

466 

467 # Enable the relativeworktrees extension 

468 config.set(("extensions",), "relativeworktrees", True) 

469 config.write_to_path() 

470 

471 

472class ParentsProvider: 

473 """Provider for commit parent information.""" 

474 

475 def __init__( 

476 self, 

477 store: "BaseObjectStore", 

478 grafts: dict[ObjectID, list[ObjectID]] = {}, 

479 shallows: Iterable[ObjectID] = [], 

480 ) -> None: 

481 """Initialize ParentsProvider. 

482 

483 Args: 

484 store: Object store to use 

485 grafts: Graft information 

486 shallows: Shallow commit SHAs 

487 """ 

488 self.store = store 

489 self.grafts = grafts 

490 self.shallows = set(shallows) 

491 

492 # Get commit graph once at initialization for performance 

493 self.commit_graph = store.get_commit_graph() 

494 

495 def get_parents( 

496 self, commit_id: ObjectID, commit: Commit | None = None 

497 ) -> list[ObjectID]: 

498 """Get parents for a commit using the parents provider.""" 

499 try: 

500 return self.grafts[commit_id] 

501 except KeyError: 

502 pass 

503 if commit_id in self.shallows: 

504 return [] 

505 

506 # Try to use commit graph for faster parent lookup 

507 if self.commit_graph: 

508 parents = self.commit_graph.get_parents(commit_id) 

509 if parents is not None: 

510 return parents 

511 

512 # Fallback to reading the commit object 

513 if commit is None: 

514 obj = self.store[commit_id] 

515 if not isinstance(obj, Commit): 

516 raise ValueError( 

517 f"Expected Commit object for commit_id {commit_id.decode()}, " 

518 f"got {type(obj).__name__}. This usually means a reference " 

519 f"points to a {type(obj).__name__} object instead of a Commit." 

520 ) 

521 commit = obj 

522 result: list[ObjectID] = commit.parents 

523 return result 

524 

525 

526class BaseRepo: 

527 """Base class for a git repository. 

528 

529 This base class is meant to be used for Repository implementations that e.g. 

530 work on top of a different transport than a standard filesystem path. 

531 

532 Attributes: 

533 object_store: Dictionary-like object for accessing 

534 the objects 

535 refs: Dictionary-like object with the refs in this 

536 repository 

537 """ 

538 

539 def __init__( 

540 self, 

541 object_store: "PackCapableObjectStore", 

542 refs: RefsContainer, 

543 object_format: "ObjectFormat | None" = None, 

544 ) -> None: 

545 """Open a repository. 

546 

547 This shouldn't be called directly, but rather through one of the 

548 base classes, such as MemoryRepo or Repo. 

549 

550 Args: 

551 object_store: Object store to use 

552 refs: Refs container to use 

553 object_format: Hash algorithm to use (if None, will use object_store's format) 

554 """ 

555 self.object_store = object_store 

556 self.refs = refs 

557 

558 self._graftpoints: dict[ObjectID, list[ObjectID]] = {} 

559 self.hooks: dict[str, Hook] = {} 

560 if object_format is None: 

561 self.object_format: ObjectFormat = object_store.object_format 

562 else: 

563 self.object_format = object_format 

564 

565 def _determine_file_mode(self) -> bool: 

566 """Probe the file-system to determine whether permissions can be trusted. 

567 

568 Returns: True if permissions can be trusted, False otherwise. 

569 """ 

570 raise NotImplementedError(self._determine_file_mode) 

571 

572 def _determine_symlinks(self) -> bool: 

573 """Probe the filesystem to determine whether symlinks can be created. 

574 

575 Returns: True if symlinks can be created, False otherwise. 

576 """ 

577 # For now, just mimic the old behaviour 

578 return sys.platform != "win32" 

579 

580 def _init_files( 

581 self, 

582 bare: bool, 

583 symlinks: bool | None = None, 

584 format: int | None = None, 

585 shared_repository: str | bool | None = None, 

586 object_format: str | None = None, 

587 ) -> None: 

588 """Initialize a default set of named files.""" 

589 from .config import ConfigFile 

590 

591 self._put_named_file("description", b"Unnamed repository") 

592 f = BytesIO() 

593 cf = ConfigFile() 

594 

595 # Determine the appropriate format version 

596 if object_format == "sha256": 

597 # SHA256 requires format version 1 

598 if format is None: 

599 format = 1 

600 elif format != 1: 

601 raise ValueError( 

602 "SHA256 object format requires repository format version 1" 

603 ) 

604 else: 

605 # SHA1 (default) can use format 0 or 1 

606 if format is None: 

607 format = 0 

608 

609 if format not in (0, 1): 

610 raise ValueError(f"Unsupported repository format version: {format}") 

611 

612 cf.set("core", "repositoryformatversion", str(format)) 

613 

614 # Set object format extension if using SHA256 

615 if object_format == "sha256": 

616 cf.set("extensions", "objectformat", "sha256") 

617 

618 # Set hash algorithm based on object format 

619 from .object_format import get_object_format 

620 

621 self.object_format = get_object_format(object_format) 

622 

623 if self._determine_file_mode(): 

624 cf.set("core", "filemode", True) 

625 else: 

626 cf.set("core", "filemode", False) 

627 

628 if symlinks is None and not bare: 

629 symlinks = self._determine_symlinks() 

630 

631 if symlinks is False: 

632 cf.set("core", "symlinks", symlinks) 

633 

634 # On macOS, set precomposeunicode to true since HFS+/APFS 

635 # returns filenames in NFD (decomposed) Unicode form 

636 if sys.platform == "darwin": 

637 cf.set("core", "precomposeunicode", True) 

638 

639 cf.set("core", "bare", bare) 

640 cf.set("core", "logallrefupdates", True) 

641 

642 # Set shared repository if specified 

643 if shared_repository is not None: 

644 if isinstance(shared_repository, bool): 

645 cf.set("core", "sharedRepository", shared_repository) 

646 else: 

647 cf.set("core", "sharedRepository", shared_repository) 

648 

649 cf.write_to_file(f) 

650 self._put_named_file("config", f.getvalue()) 

651 self._put_named_file(os.path.join("info", "exclude"), b"") 

652 

653 # Allow subclasses to handle config initialization 

654 self._init_config(cf) 

655 

656 def _init_config(self, config: "ConfigFile") -> None: 

657 """Initialize repository configuration. 

658 

659 This method can be overridden by subclasses to handle config initialization. 

660 

661 Args: 

662 config: The ConfigFile object that was just created 

663 """ 

664 # Default implementation does nothing 

665 

666 def get_named_file(self, path: str) -> BinaryIO | None: 

667 """Get a file from the control dir with a specific name. 

668 

669 Although the filename should be interpreted as a filename relative to 

670 the control dir in a disk-based Repo, the object returned need not be 

671 pointing to a file in that location. 

672 

673 Args: 

674 path: The path to the file, relative to the control dir. 

675 Returns: An open file object, or None if the file does not exist. 

676 """ 

677 raise NotImplementedError(self.get_named_file) 

678 

679 def _put_named_file(self, path: str, contents: bytes) -> None: 

680 """Write a file to the control dir with the given name and contents. 

681 

682 Args: 

683 path: The path to the file, relative to the control dir. 

684 contents: A string to write to the file. 

685 """ 

686 raise NotImplementedError(self._put_named_file) 

687 

688 def _del_named_file(self, path: str) -> None: 

689 """Delete a file in the control directory with the given name.""" 

690 raise NotImplementedError(self._del_named_file) 

691 

692 def open_index(self, config: "Config | None" = None) -> "Index": 

693 """Open the index for this repository. 

694 

695 Args: 

696 config: Configuration to consult for index settings. If None, 

697 implementations may fall back to ``self.get_config_stack()``. 

698 

699 Raises: 

700 NoIndexPresent: If no index is present 

701 Returns: The matching `Index` 

702 """ 

703 raise NotImplementedError(self.open_index) 

704 

705 def _change_object_format(self, object_format_name: str) -> None: 

706 """Change the object format of this repository. 

707 

708 This can only be done if the object store is empty (no objects written yet). 

709 

710 Args: 

711 object_format_name: Name of the new object format (e.g., "sha1", "sha256") 

712 

713 Raises: 

714 AssertionError: If the object store is not empty 

715 """ 

716 # Check if object store has any objects 

717 for _ in self.object_store: 

718 raise AssertionError( 

719 "Cannot change object format: repository already contains objects" 

720 ) 

721 

722 # Update the object format 

723 from .object_format import get_object_format 

724 

725 new_format = get_object_format(object_format_name) 

726 self.object_format = new_format 

727 self.object_store.object_format = new_format 

728 

729 # Update config file 

730 config = self.get_config() 

731 

732 if object_format_name == "sha1": 

733 # For SHA-1, explicitly remove objectformat extension if present 

734 try: 

735 config.remove("extensions", "objectformat") 

736 except KeyError: 

737 pass 

738 else: 

739 # For non-SHA-1 formats, set repositoryformatversion to 1 and objectformat extension 

740 config.set("core", "repositoryformatversion", "1") 

741 config.set("extensions", "objectformat", object_format_name) 

742 

743 config.write_to_path() 

744 

745 def fetch( 

746 self, 

747 target: "BaseRepo", 

748 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]] 

749 | None = None, 

750 progress: Callable[..., None] | None = None, 

751 depth: int | None = None, 

752 ) -> dict[Ref, ObjectID]: 

753 """Fetch objects into another repository. 

754 

755 Args: 

756 target: The target repository 

757 determine_wants: Optional function to determine what refs to 

758 fetch. 

759 progress: Optional progress function 

760 depth: Optional shallow fetch depth 

761 Returns: The local refs 

762 """ 

763 # Fix object format if needed 

764 if self.object_format != target.object_format: 

765 # Change the target repo's format if it's empty 

766 target._change_object_format(self.object_format.name) 

767 

768 if determine_wants is None: 

769 determine_wants = target.object_store.determine_wants_all 

770 count, pack_data = self.fetch_pack_data( 

771 determine_wants, 

772 target.get_graph_walker(), 

773 progress=progress, 

774 depth=depth, 

775 ) 

776 target.object_store.add_pack_data(count, pack_data, progress) 

777 return self.get_refs() 

778 

779 def fetch_pack_data( 

780 self, 

781 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]], 

782 graph_walker: "GraphWalker", 

783 progress: Callable[[bytes], None] | None, 

784 *, 

785 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None, 

786 depth: int | None = None, 

787 ) -> tuple[int, Iterator["UnpackedObject"]]: 

788 """Fetch the pack data required for a set of revisions. 

789 

790 Args: 

791 determine_wants: Function that takes a dictionary with heads 

792 and returns the list of heads to fetch. 

793 graph_walker: Object that can iterate over the list of revisions 

794 to fetch and has an "ack" method that will be called to acknowledge 

795 that a revision is present. 

796 progress: Simple progress function that will be called with 

797 updated progress strings. 

798 get_tagged: Function that returns a dict of pointed-to sha -> 

799 tag sha for including tags. 

800 depth: Shallow fetch depth 

801 Returns: count and iterator over pack data 

802 """ 

803 missing_objects = self.find_missing_objects( 

804 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth 

805 ) 

806 if missing_objects is None: 

807 return 0, iter([]) 

808 remote_has = missing_objects.get_remote_has() 

809 object_ids = list(missing_objects) 

810 return len(object_ids), generate_unpacked_objects( 

811 self.object_store, object_ids, progress=progress, other_haves=remote_has 

812 ) 

813 

814 def find_missing_objects( 

815 self, 

816 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]], 

817 graph_walker: "GraphWalker", 

818 progress: Callable[[bytes], None] | None, 

819 *, 

820 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None, 

821 depth: int | None = None, 

822 ) -> MissingObjectFinder | None: 

823 """Fetch the missing objects required for a set of revisions. 

824 

825 Args: 

826 determine_wants: Function that takes a dictionary with heads 

827 and returns the list of heads to fetch. 

828 graph_walker: Object that can iterate over the list of revisions 

829 to fetch and has an "ack" method that will be called to acknowledge 

830 that a revision is present. 

831 progress: Simple progress function that will be called with 

832 updated progress strings. 

833 get_tagged: Function that returns a dict of pointed-to sha -> 

834 tag sha for including tags. 

835 depth: Shallow fetch depth 

836 Returns: iterator over objects, with __len__ implemented 

837 """ 

838 import logging 

839 

840 # Filter out refs pointing to missing objects to avoid errors downstream. 

841 # This makes Dulwich more robust when dealing with broken refs on disk. 

842 # Previously serialize_refs() did this filtering as a side-effect. 

843 all_refs = self.get_refs() 

844 refs: dict[Ref, ObjectID] = {} 

845 for ref, sha in all_refs.items(): 

846 if sha in self.object_store: 

847 refs[ref] = sha 

848 else: 

849 logging.warning( 

850 "ref %s points at non-present sha %s", 

851 ref.decode("utf-8", "replace"), 

852 sha.decode("ascii"), 

853 ) 

854 

855 wants = determine_wants(refs, depth) 

856 if not isinstance(wants, list): 

857 raise TypeError("determine_wants() did not return a list") 

858 

859 current_shallow = set(getattr(graph_walker, "shallow", set())) 

860 

861 if depth not in (None, 0): 

862 assert depth is not None 

863 shallow, not_shallow = find_shallow(self.object_store, wants, depth) 

864 # Only update if graph_walker has shallow attribute 

865 if hasattr(graph_walker, "shallow"): 

866 graph_walker.shallow.update(shallow - not_shallow) 

867 new_shallow = graph_walker.shallow - current_shallow 

868 unshallow = not_shallow & current_shallow 

869 setattr(graph_walker, "unshallow", unshallow) 

870 if hasattr(graph_walker, "update_shallow"): 

871 graph_walker.update_shallow(new_shallow, unshallow) 

872 else: 

873 unshallow = getattr(graph_walker, "unshallow", set()) 

874 

875 if wants == []: 

876 # TODO(dborowitz): find a way to short-circuit that doesn't change 

877 # this interface. 

878 

879 if getattr(graph_walker, "shallow", set()) or unshallow: 

880 # Do not send a pack in shallow short-circuit path 

881 return None 

882 

883 # Return an actual MissingObjectFinder with empty wants 

884 return MissingObjectFinder( 

885 self.object_store, 

886 haves=[], 

887 wants=[], 

888 ) 

889 

890 # If the graph walker is set up with an implementation that can 

891 # ACK/NAK to the wire, it will write data to the client through 

892 # this call as a side-effect. 

893 haves = self.object_store.find_common_revisions(graph_walker) 

894 

895 # Deal with shallow requests separately because the haves do 

896 # not reflect what objects are missing 

897 if getattr(graph_walker, "shallow", set()) or unshallow: 

898 # TODO: filter the haves commits from iter_shas. the specific 

899 # commits aren't missing. 

900 haves = [] 

901 

902 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow) 

903 

904 def get_parents(commit: Commit) -> list[ObjectID]: 

905 """Get parents for a commit using the parents provider. 

906 

907 Args: 

908 commit: Commit object 

909 

910 Returns: 

911 List of parent commit SHAs 

912 """ 

913 return parents_provider.get_parents(commit.id, commit) 

914 

915 return MissingObjectFinder( 

916 self.object_store, 

917 haves=haves, 

918 wants=wants, 

919 shallow=getattr(graph_walker, "shallow", set()), 

920 progress=progress, 

921 get_tagged=get_tagged, 

922 get_parents=get_parents, 

923 ) 

924 

925 def generate_pack_data( 

926 self, 

927 have: set[ObjectID], 

928 want: set[ObjectID], 

929 *, 

930 shallow: set[ObjectID] | None = None, 

931 progress: Callable[[str], None] | None = None, 

932 ofs_delta: bool | None = None, 

933 ) -> tuple[int, Iterator["UnpackedObject"]]: 

934 """Generate pack data objects for a set of wants/haves. 

935 

936 Args: 

937 have: List of SHA1s of objects that should not be sent 

938 want: List of SHA1s of objects that should be sent 

939 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits) 

940 ofs_delta: Whether OFS deltas can be included 

941 progress: Optional progress reporting method 

942 """ 

943 if shallow is None: 

944 shallow = self.get_shallow() 

945 return self.object_store.generate_pack_data( 

946 have, 

947 want, 

948 shallow=shallow, 

949 progress=progress, 

950 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA, 

951 ) 

952 

953 def get_graph_walker( 

954 self, heads: list[ObjectID] | None = None 

955 ) -> ObjectStoreGraphWalker: 

956 """Retrieve a graph walker. 

957 

958 A graph walker is used by a remote repository (or proxy) 

959 to find out which objects are present in this repository. 

960 

961 Args: 

962 heads: Repository heads to use (optional) 

963 Returns: A graph walker object 

964 """ 

965 if heads is None: 

966 heads = [ 

967 sha 

968 for sha in self.refs.as_dict(Ref(b"refs/heads")).values() 

969 if sha in self.object_store 

970 ] 

971 parents_provider = ParentsProvider(self.object_store) 

972 return ObjectStoreGraphWalker( 

973 heads, 

974 parents_provider.get_parents, 

975 shallow=self.get_shallow(), 

976 update_shallow=self.update_shallow, 

977 ) 

978 

979 def get_refs(self) -> dict[Ref, ObjectID]: 

980 """Get dictionary with all refs. 

981 

982 Returns: A ``dict`` mapping ref names to SHA1s 

983 """ 

984 return self.refs.as_dict() 

985 

986 def head(self) -> ObjectID: 

987 """Return the SHA1 pointed at by HEAD.""" 

988 # TODO: move this method to WorkTree 

989 return self.refs[HEADREF] 

990 

991 def _get_object(self, sha: ObjectID | RawObjectID, cls: type[T]) -> T: 

992 assert len(sha) in ( 

993 self.object_format.oid_length, 

994 self.object_format.hex_length, 

995 ) 

996 ret = self.get_object(sha) 

997 if not isinstance(ret, cls): 

998 if cls is Commit: 

999 raise NotCommitError(ret.id) 

1000 elif cls is Blob: 

1001 raise NotBlobError(ret.id) 

1002 elif cls is Tree: 

1003 raise NotTreeError(ret.id) 

1004 elif cls is Tag: 

1005 raise NotTagError(ret.id) 

1006 else: 

1007 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

1008 return ret 

1009 

1010 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile: 

1011 """Retrieve the object with the specified SHA. 

1012 

1013 Args: 

1014 sha: SHA to retrieve 

1015 Returns: A ShaFile object 

1016 Raises: 

1017 KeyError: when the object can not be found 

1018 """ 

1019 return self.object_store[sha] 

1020 

1021 def parents_provider(self) -> ParentsProvider: 

1022 """Get a parents provider for this repository. 

1023 

1024 Returns: 

1025 ParentsProvider instance configured with grafts and shallows 

1026 """ 

1027 return ParentsProvider( 

1028 self.object_store, 

1029 grafts=self._graftpoints, 

1030 shallows=self.get_shallow(), 

1031 ) 

1032 

1033 def get_parents( 

1034 self, sha: ObjectID, commit: Commit | None = None 

1035 ) -> list[ObjectID]: 

1036 """Retrieve the parents of a specific commit. 

1037 

1038 If the specific commit is a graftpoint, the graft parents 

1039 will be returned instead. 

1040 

1041 Args: 

1042 sha: SHA of the commit for which to retrieve the parents 

1043 commit: Optional commit matching the sha 

1044 Returns: List of parents 

1045 """ 

1046 return self.parents_provider().get_parents(sha, commit) 

1047 

1048 def get_config(self) -> "ConfigFile": 

1049 """Retrieve the config object. 

1050 

1051 Returns: `ConfigFile` object for the ``.git/config`` file. 

1052 """ 

1053 raise NotImplementedError(self.get_config) 

1054 

1055 def get_worktree_config(self) -> "ConfigFile": 

1056 """Retrieve the worktree config object.""" 

1057 raise NotImplementedError(self.get_worktree_config) 

1058 

1059 def get_description(self) -> bytes | None: 

1060 """Retrieve the description for this repository. 

1061 

1062 Returns: Bytes with the description of the repository 

1063 as set by the user. 

1064 """ 

1065 raise NotImplementedError(self.get_description) 

1066 

1067 def set_description(self, description: bytes) -> None: 

1068 """Set the description for this repository. 

1069 

1070 Args: 

1071 description: Text to set as description for this repository. 

1072 """ 

1073 raise NotImplementedError(self.set_description) 

1074 

1075 def get_rebase_state_manager(self) -> "RebaseStateManager": 

1076 """Get the appropriate rebase state manager for this repository. 

1077 

1078 Returns: RebaseStateManager instance 

1079 """ 

1080 raise NotImplementedError(self.get_rebase_state_manager) 

1081 

1082 def get_blob_normalizer( 

1083 self, config: "Config | None" = None 

1084 ) -> "FilterBlobNormalizer": 

1085 """Return a BlobNormalizer object for checkin/checkout operations. 

1086 

1087 Args: 

1088 config: Configuration to consult for filter setup. If None, 

1089 implementations may fall back to ``self.get_config_stack()``. 

1090 

1091 Returns: BlobNormalizer instance 

1092 """ 

1093 raise NotImplementedError(self.get_blob_normalizer) 

1094 

1095 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

1096 """Read gitattributes for the repository. 

1097 

1098 Args: 

1099 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

1100 

1101 Returns: 

1102 GitAttributes object that can be used to match paths 

1103 """ 

1104 raise NotImplementedError(self.get_gitattributes) 

1105 

1106 def get_config_stack(self) -> "StackedConfig": 

1107 """Return a config stack for this repository. 

1108 

1109 This stack accesses the configuration for both this repository 

1110 itself (.git/config) and the global configuration, which usually 

1111 lives in ~/.gitconfig. 

1112 

1113 Returns: `Config` instance for this repository 

1114 """ 

1115 from .config import ConfigFile, StackedConfig 

1116 

1117 local_config = self.get_config() 

1118 backends: list[ConfigFile] = [local_config] 

1119 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

1120 backends.append(self.get_worktree_config()) 

1121 

1122 backends += StackedConfig.default_backends() 

1123 return StackedConfig(backends, writable=local_config) 

1124 

1125 def get_shallow(self) -> set[ObjectID]: 

1126 """Get the set of shallow commits. 

1127 

1128 Returns: Set of shallow commits. 

1129 """ 

1130 f = self.get_named_file("shallow") 

1131 if f is None: 

1132 return set() 

1133 with f: 

1134 return {ObjectID(line.strip()) for line in f} 

1135 

1136 def update_shallow( 

1137 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None 

1138 ) -> None: 

1139 """Update the list of shallow objects. 

1140 

1141 Args: 

1142 new_shallow: Newly shallow objects 

1143 new_unshallow: Newly no longer shallow objects 

1144 """ 

1145 shallow = self.get_shallow() 

1146 if new_shallow: 

1147 shallow.update(new_shallow) 

1148 if new_unshallow: 

1149 shallow.difference_update(new_unshallow) 

1150 if shallow: 

1151 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

1152 else: 

1153 self._del_named_file("shallow") 

1154 

1155 def get_peeled(self, ref: Ref) -> ObjectID: 

1156 """Get the peeled value of a ref. 

1157 

1158 Args: 

1159 ref: The refname to peel. 

1160 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

1161 intermediate tags; if the original ref does not point to a tag, 

1162 this will equal the original SHA1. 

1163 """ 

1164 cached = self.refs.get_peeled(ref) 

1165 if cached is not None: 

1166 return cached 

1167 return peel_sha(self.object_store, self.refs[ref])[1].id 

1168 

1169 @property 

1170 def notes(self) -> "Notes": 

1171 """Access notes functionality for this repository. 

1172 

1173 Returns: 

1174 Notes object for accessing notes 

1175 """ 

1176 from .notes import Notes 

1177 

1178 return Notes(self.object_store, self.refs) 

1179 

1180 def get_walker( 

1181 self, 

1182 include: Sequence[ObjectID] | None = None, 

1183 exclude: Sequence[ObjectID] | None = None, 

1184 order: str = "date", 

1185 reverse: bool = False, 

1186 max_entries: int | None = None, 

1187 paths: Sequence[bytes] | None = None, 

1188 rename_detector: "RenameDetector | None" = None, 

1189 follow: bool = False, 

1190 since: int | None = None, 

1191 until: int | None = None, 

1192 queue_cls: type | None = None, 

1193 ) -> "Walker": 

1194 """Obtain a walker for this repository. 

1195 

1196 Args: 

1197 include: Iterable of SHAs of commits to include along with their 

1198 ancestors. Defaults to [HEAD] 

1199 exclude: Iterable of SHAs of commits to exclude along with their 

1200 ancestors, overriding includes. 

1201 order: ORDER_* constant specifying the order of results. 

1202 Anything other than ORDER_DATE may result in O(n) memory usage. 

1203 reverse: If True, reverse the order of output, requiring O(n) 

1204 memory. 

1205 max_entries: The maximum number of entries to yield, or None for 

1206 no limit. 

1207 paths: Iterable of file or subtree paths to show entries for. 

1208 rename_detector: diff.RenameDetector object for detecting 

1209 renames. 

1210 follow: If True, follow path across renames/copies. Forces a 

1211 default rename_detector. 

1212 since: Timestamp to list commits after. 

1213 until: Timestamp to list commits before. 

1214 queue_cls: A class to use for a queue of commits, supporting the 

1215 iterator protocol. The constructor takes a single argument, the Walker. 

1216 

1217 Returns: A `Walker` object 

1218 """ 

1219 from .walk import Walker, _CommitTimeQueue 

1220 

1221 if include is None: 

1222 include = [self.head()] 

1223 

1224 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs 

1225 return Walker( 

1226 self.object_store, 

1227 include, 

1228 exclude=exclude, 

1229 order=order, 

1230 reverse=reverse, 

1231 max_entries=max_entries, 

1232 paths=paths, 

1233 rename_detector=rename_detector, 

1234 follow=follow, 

1235 since=since, 

1236 until=until, 

1237 get_parents=lambda commit: self.get_parents(commit.id, commit), 

1238 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue, 

1239 ) 

1240 

1241 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile": 

1242 """Retrieve a Git object by SHA1 or ref. 

1243 

1244 Args: 

1245 name: A Git object SHA1 or a ref name 

1246 Returns: A `ShaFile` object, such as a Commit or Blob 

1247 Raises: 

1248 KeyError: when the specified ref or object does not exist 

1249 """ 

1250 if not isinstance(name, bytes): 

1251 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

1252 # If it looks like a ref name, only try refs 

1253 if name == b"HEAD" or name.startswith(b"refs/"): 

1254 try: 

1255 return self.object_store[self.refs[Ref(name)]] 

1256 except (RefFormatError, KeyError): 

1257 pass 

1258 # Otherwise, try as object ID if length matches 

1259 if len(name) in ( 

1260 self.object_store.object_format.oid_length, 

1261 self.object_store.object_format.hex_length, 

1262 ): 

1263 try: 

1264 return self.object_store[ 

1265 ObjectID(name) 

1266 if len(name) == self.object_store.object_format.hex_length 

1267 else RawObjectID(name) 

1268 ] 

1269 except (KeyError, ValueError): 

1270 pass 

1271 # If nothing worked, raise KeyError 

1272 raise KeyError(name) 

1273 

1274 def __contains__(self, name: bytes) -> bool: 

1275 """Check if a specific Git object or ref is present. 

1276 

1277 Args: 

1278 name: Git object SHA1/SHA256 or ref name 

1279 """ 

1280 if len(name) == 20: 

1281 return RawObjectID(name) in self.object_store or Ref(name) in self.refs 

1282 elif len(name) == 40 and valid_hexsha(name): 

1283 return ObjectID(name) in self.object_store or Ref(name) in self.refs 

1284 # Check if it's a binary or hex SHA 

1285 if len(name) == self.object_format.oid_length: 

1286 return RawObjectID(name) in self.object_store or Ref(name) in self.refs 

1287 elif len(name) == self.object_format.hex_length and valid_hexsha(name): 

1288 return ObjectID(name) in self.object_store or Ref(name) in self.refs 

1289 else: 

1290 return Ref(name) in self.refs 

1291 

1292 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None: 

1293 """Set a ref. 

1294 

1295 Args: 

1296 name: ref name 

1297 value: Ref value - either a ShaFile object, or a hex sha 

1298 """ 

1299 if name.startswith(b"refs/") or name == HEADREF: 

1300 ref_name = Ref(name) 

1301 if isinstance(value, ShaFile): 

1302 self.refs[ref_name] = value.id 

1303 elif isinstance(value, bytes): 

1304 self.refs[ref_name] = ObjectID(value) 

1305 else: 

1306 raise TypeError(value) 

1307 else: 

1308 raise ValueError(name) 

1309 

1310 def __delitem__(self, name: bytes) -> None: 

1311 """Remove a ref. 

1312 

1313 Args: 

1314 name: Name of the ref to remove 

1315 """ 

1316 if name.startswith(b"refs/") or name == HEADREF: 

1317 del self.refs[Ref(name)] 

1318 else: 

1319 raise ValueError(name) 

1320 

1321 def _get_user_identity( 

1322 self, config: "StackedConfig", kind: str | None = None 

1323 ) -> bytes: 

1324 """Determine the identity to use for new commits.""" 

1325 warnings.warn( 

1326 "use get_user_identity() rather than Repo._get_user_identity", 

1327 DeprecationWarning, 

1328 ) 

1329 return get_user_identity(config) 

1330 

1331 def _add_graftpoints( 

1332 self, updated_graftpoints: dict[ObjectID, list[ObjectID]] 

1333 ) -> None: 

1334 """Add or modify graftpoints. 

1335 

1336 Args: 

1337 updated_graftpoints: Dict of commit shas to list of parent shas 

1338 """ 

1339 # Simple validation 

1340 for commit, parents in updated_graftpoints.items(): 

1341 for sha in [commit, *parents]: 

1342 check_hexsha(sha, "Invalid graftpoint") 

1343 

1344 self._graftpoints.update(updated_graftpoints) 

1345 

1346 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None: 

1347 """Remove graftpoints. 

1348 

1349 Args: 

1350 to_remove: List of commit shas 

1351 """ 

1352 for sha in to_remove: 

1353 del self._graftpoints[sha] 

1354 

1355 def _read_heads(self, name: str) -> list[ObjectID]: 

1356 f = self.get_named_file(name) 

1357 if f is None: 

1358 return [] 

1359 with f: 

1360 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()] 

1361 

1362 def get_worktree(self) -> "WorkTree": 

1363 """Get the working tree for this repository. 

1364 

1365 Returns: 

1366 WorkTree instance for performing working tree operations 

1367 

1368 Raises: 

1369 NotImplementedError: If the repository doesn't support working trees 

1370 """ 

1371 raise NotImplementedError( 

1372 "Working tree operations not supported by this repository type" 

1373 ) 

1374 

1375 

1376def read_gitfile(f: BinaryIO) -> str: 

1377 """Read a ``.git`` file. 

1378 

1379 The first line of the file should start with "gitdir: " 

1380 

1381 Args: 

1382 f: File-like object to read from 

1383 Returns: A path 

1384 """ 

1385 cs = f.read() 

1386 if not cs.startswith(b"gitdir: "): 

1387 raise ValueError("Expected file to start with 'gitdir: '") 

1388 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8") 

1389 

1390 

1391class UnsupportedVersion(Exception): 

1392 """Unsupported repository version.""" 

1393 

1394 def __init__(self, version: int) -> None: 

1395 """Initialize UnsupportedVersion exception. 

1396 

1397 Args: 

1398 version: The unsupported repository version 

1399 """ 

1400 self.version = version 

1401 

1402 

1403class UnsupportedExtension(Exception): 

1404 """Unsupported repository extension.""" 

1405 

1406 def __init__(self, extension: str) -> None: 

1407 """Initialize UnsupportedExtension exception. 

1408 

1409 Args: 

1410 extension: The unsupported repository extension 

1411 """ 

1412 self.extension = extension 

1413 

1414 

1415class Repo(BaseRepo): 

1416 """A git repository backed by local disk. 

1417 

1418 To open an existing repository, call the constructor with 

1419 the path of the repository. 

1420 

1421 To create a new repository, use the Repo.init class method. 

1422 

1423 Note that a repository object may hold on to resources such 

1424 as file handles for performance reasons; call .close() to free 

1425 up those resources. 

1426 

1427 Attributes: 

1428 path: Path to the working copy (if it exists) or repository control 

1429 directory (if the repository is bare) 

1430 bare: Whether this is a bare repository 

1431 """ 

1432 

1433 path: str 

1434 bare: bool 

1435 object_store: DiskObjectStore 

1436 filter_context: "FilterContext | None" 

1437 

1438 def __init__( 

1439 self, 

1440 root: str | bytes | os.PathLike[str], 

1441 object_store: PackBasedObjectStore | None = None, 

1442 bare: bool | None = None, 

1443 ) -> None: 

1444 """Open a repository on disk. 

1445 

1446 Args: 

1447 root: Path to the repository's root. 

1448 object_store: ObjectStore to use; if omitted, we use the 

1449 repository's default object store 

1450 bare: True if this is a bare repository. 

1451 """ 

1452 root = os.fspath(root) 

1453 if isinstance(root, bytes): 

1454 root = os.fsdecode(root) 

1455 hidden_path = os.path.join(root, CONTROLDIR) 

1456 if bare is None: 

1457 if os.path.isfile(hidden_path) or os.path.isdir( 

1458 os.path.join(hidden_path, OBJECTDIR) 

1459 ): 

1460 bare = False 

1461 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1462 os.path.join(root, REFSDIR) 

1463 ): 

1464 bare = True 

1465 else: 

1466 raise NotGitRepository( 

1467 "No git repository was found at {path}".format(**dict(path=root)) 

1468 ) 

1469 

1470 self.bare = bare 

1471 if bare is False: 

1472 if os.path.isfile(hidden_path): 

1473 with open(hidden_path, "rb") as f: 

1474 path = read_gitfile(f) 

1475 self._controldir = os.path.join(root, path) 

1476 else: 

1477 self._controldir = hidden_path 

1478 else: 

1479 self._controldir = root 

1480 commondir = self.get_named_file(COMMONDIR) 

1481 if commondir is not None: 

1482 with commondir: 

1483 self._commondir = os.path.join( 

1484 self.controldir(), 

1485 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1486 ) 

1487 else: 

1488 self._commondir = self._controldir 

1489 self.path = root 

1490 

1491 # Initialize refs early so they're available for config condition matchers 

1492 self.refs = DiskRefsContainer( 

1493 self.commondir(), self._controldir, logger=self._write_reflog 

1494 ) 

1495 

1496 # Initialize worktrees container 

1497 from .worktree import WorkTreeContainer 

1498 

1499 self.worktrees = WorkTreeContainer(self) 

1500 

1501 config = self.get_config() 

1502 try: 

1503 repository_format_version = config.get("core", "repositoryformatversion") 

1504 format_version = ( 

1505 0 

1506 if repository_format_version is None 

1507 else int(repository_format_version) 

1508 ) 

1509 except KeyError: 

1510 format_version = 0 

1511 

1512 if format_version not in (0, 1): 

1513 raise UnsupportedVersion(format_version) 

1514 

1515 # Track extensions we encounter 

1516 has_reftable_extension = False 

1517 for extension, value in config.items((b"extensions",)): 

1518 if extension.lower() == b"refstorage": 

1519 if value == b"reftable": 

1520 has_reftable_extension = True 

1521 else: 

1522 raise UnsupportedExtension(f"refStorage = {value.decode()}") 

1523 elif extension.lower() not in ( 

1524 b"worktreeconfig", 

1525 b"objectformat", 

1526 b"relativeworktrees", 

1527 ): 

1528 raise UnsupportedExtension(extension.decode("utf-8")) 

1529 

1530 if object_store is None: 

1531 # Get shared repository permissions from config 

1532 try: 

1533 shared_value = config.get(("core",), "sharedRepository") 

1534 file_mode, dir_mode = parse_shared_repository(shared_value) 

1535 except KeyError: 

1536 file_mode, dir_mode = None, None 

1537 

1538 object_store = DiskObjectStore.from_config( 

1539 os.path.join(self.commondir(), OBJECTDIR), 

1540 config, 

1541 file_mode=file_mode, 

1542 dir_mode=dir_mode, 

1543 ) 

1544 

1545 # Use reftable if extension is configured 

1546 if has_reftable_extension: 

1547 from .reftable import ReftableRefsContainer 

1548 

1549 self.refs = ReftableRefsContainer(self.commondir()) 

1550 # Update worktrees container after refs change 

1551 self.worktrees = WorkTreeContainer(self) 

1552 BaseRepo.__init__(self, object_store, self.refs) 

1553 

1554 # Determine hash algorithm from config if not already set 

1555 if self.object_format is None: 

1556 from .object_format import DEFAULT_OBJECT_FORMAT, get_object_format 

1557 

1558 if format_version == 1: 

1559 try: 

1560 object_format = config.get((b"extensions",), b"objectformat") 

1561 self.object_format = get_object_format( 

1562 object_format.decode("ascii") 

1563 ) 

1564 except KeyError: 

1565 self.object_format = DEFAULT_OBJECT_FORMAT 

1566 else: 

1567 self.object_format = DEFAULT_OBJECT_FORMAT 

1568 

1569 self._graftpoints = {} 

1570 graft_file = self.get_named_file( 

1571 os.path.join("info", "grafts"), basedir=self.commondir() 

1572 ) 

1573 if graft_file: 

1574 with graft_file: 

1575 self._graftpoints.update(parse_graftpoints(graft_file)) 

1576 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1577 if graft_file: 

1578 with graft_file: 

1579 self._graftpoints.update(parse_graftpoints(graft_file)) 

1580 

1581 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1582 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1583 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1584 self.hooks["pre-receive"] = PreReceiveShellHook(self.controldir()) 

1585 self.hooks["update"] = UpdateShellHook(self.controldir()) 

1586 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1587 

1588 # Initialize filter context as None, will be created lazily 

1589 self.filter_context = None 

1590 

1591 def get_worktree(self) -> "WorkTree": 

1592 """Get the working tree for this repository. 

1593 

1594 Returns: 

1595 WorkTree instance for performing working tree operations 

1596 """ 

1597 from .worktree import WorkTree 

1598 

1599 return WorkTree(self, self.path) 

1600 

1601 def _write_reflog( 

1602 self, 

1603 ref: bytes, 

1604 old_sha: bytes, 

1605 new_sha: bytes, 

1606 committer: bytes | None, 

1607 timestamp: int | None, 

1608 timezone: int | None, 

1609 message: bytes, 

1610 ) -> None: 

1611 from .reflog import format_reflog_line 

1612 

1613 path = self._reflog_path(ref) 

1614 

1615 # Get shared repository permissions 

1616 file_mode, dir_mode = self._get_shared_repository_permissions() 

1617 

1618 # Create directory with appropriate permissions 

1619 parent_dir = os.path.dirname(path) 

1620 # Create directory tree, setting permissions on each level if needed 

1621 parts = [] 

1622 current = parent_dir 

1623 while current and not os.path.exists(current): 

1624 parts.append(current) 

1625 current = os.path.dirname(current) 

1626 parts.reverse() 

1627 for part in parts: 

1628 os.mkdir(part) 

1629 if dir_mode is not None: 

1630 os.chmod(part, dir_mode) 

1631 if committer is None: 

1632 config = self.get_config_stack() 

1633 committer = get_user_identity(config) 

1634 check_user_identity(committer) 

1635 if timestamp is None: 

1636 timestamp = int(time.time()) 

1637 if timezone is None: 

1638 timezone = 0 # FIXME 

1639 with open(path, "ab") as f: 

1640 f.write( 

1641 format_reflog_line( 

1642 old_sha, new_sha, committer, timestamp, timezone, message 

1643 ) 

1644 + b"\n" 

1645 ) 

1646 

1647 # Set file permissions (open() respects umask, so we need chmod to set the actual mode) 

1648 # Always chmod to ensure correct permissions even if file already existed 

1649 if file_mode is not None: 

1650 os.chmod(path, file_mode) 

1651 

1652 def _reflog_path(self, ref: bytes) -> str: 

1653 if ref.startswith((b"main-worktree/", b"worktrees/")): 

1654 raise NotImplementedError(f"refs {ref.decode()} are not supported") 

1655 

1656 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir() 

1657 return os.path.join(base, "logs", os.fsdecode(ref)) 

1658 

1659 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]: 

1660 """Read reflog entries for a reference. 

1661 

1662 Args: 

1663 ref: Reference name (e.g. b'HEAD', b'refs/heads/master') 

1664 

1665 Yields: 

1666 reflog.Entry objects in chronological order (oldest first) 

1667 """ 

1668 from .reflog import read_reflog 

1669 

1670 path = self._reflog_path(ref) 

1671 try: 

1672 with open(path, "rb") as f: 

1673 yield from read_reflog(f) 

1674 except FileNotFoundError: 

1675 return 

1676 

1677 @classmethod 

1678 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo": 

1679 """Iterate parent directories to discover a repository. 

1680 

1681 Return a Repo object for the first parent directory that looks like a 

1682 Git repository. 

1683 

1684 Args: 

1685 start: The directory to start discovery from (defaults to '.') 

1686 """ 

1687 path = os.path.abspath(start) 

1688 while True: 

1689 try: 

1690 return cls(path) 

1691 except NotGitRepository: 

1692 new_path, _tail = os.path.split(path) 

1693 if new_path == path: # Root reached 

1694 break 

1695 path = new_path 

1696 start_str = os.fspath(start) 

1697 if isinstance(start_str, bytes): 

1698 start_str = start_str.decode("utf-8") 

1699 raise NotGitRepository(f"No git repository was found at {start_str}") 

1700 

1701 def controldir(self) -> str: 

1702 """Return the path of the control directory.""" 

1703 return self._controldir 

1704 

1705 def commondir(self) -> str: 

1706 """Return the path of the common directory. 

1707 

1708 For a main working tree, it is identical to controldir(). 

1709 

1710 For a linked working tree, it is the control directory of the 

1711 main working tree. 

1712 """ 

1713 return self._commondir 

1714 

1715 def _determine_file_mode(self) -> bool: 

1716 """Probe the file-system to determine whether permissions can be trusted. 

1717 

1718 Returns: True if permissions can be trusted, False otherwise. 

1719 """ 

1720 fname = os.path.join(self.path, ".probe-permissions") 

1721 with open(fname, "w") as f: 

1722 f.write("") 

1723 

1724 st1 = os.lstat(fname) 

1725 try: 

1726 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1727 except PermissionError: 

1728 return False 

1729 st2 = os.lstat(fname) 

1730 

1731 os.unlink(fname) 

1732 

1733 mode_differs = st1.st_mode != st2.st_mode 

1734 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1735 

1736 return mode_differs and st2_has_exec 

1737 

1738 def _determine_symlinks(self) -> bool: 

1739 """Probe the filesystem to determine whether symlinks can be created. 

1740 

1741 Returns: True if symlinks can be created, False otherwise. 

1742 """ 

1743 # TODO(jelmer): Actually probe disk / look at filesystem 

1744 return sys.platform != "win32" 

1745 

1746 def _get_shared_repository_permissions( 

1747 self, 

1748 ) -> tuple[int | None, int | None]: 

1749 """Get shared repository file and directory permissions from config. 

1750 

1751 Returns: 

1752 tuple of (file_mask, directory_mask) or (None, None) if not shared 

1753 """ 

1754 try: 

1755 config = self.get_config() 

1756 value = config.get(("core",), "sharedRepository") 

1757 return parse_shared_repository(value) 

1758 except KeyError: 

1759 return (None, None) 

1760 

1761 def _put_named_file(self, path: str, contents: bytes) -> None: 

1762 """Write a file to the control dir with the given name and contents. 

1763 

1764 Args: 

1765 path: The path to the file, relative to the control dir. 

1766 contents: A string to write to the file. 

1767 """ 

1768 path = path.lstrip(os.path.sep) 

1769 

1770 # Get shared repository permissions 

1771 file_mode, _ = self._get_shared_repository_permissions() 

1772 

1773 # Create file with appropriate permissions 

1774 if file_mode is not None: 

1775 with GitFile( 

1776 os.path.join(self.controldir(), path), "wb", mask=file_mode 

1777 ) as f: 

1778 f.write(contents) 

1779 else: 

1780 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1781 f.write(contents) 

1782 

1783 def _del_named_file(self, path: str) -> None: 

1784 try: 

1785 os.unlink(os.path.join(self.controldir(), path)) 

1786 except FileNotFoundError: 

1787 return 

1788 

1789 def get_named_file( 

1790 self, 

1791 path: str | bytes, 

1792 basedir: str | None = None, 

1793 ) -> BinaryIO | None: 

1794 """Get a file from the control dir with a specific name. 

1795 

1796 Although the filename should be interpreted as a filename relative to 

1797 the control dir in a disk-based Repo, the object returned need not be 

1798 pointing to a file in that location. 

1799 

1800 Args: 

1801 path: The path to the file, relative to the control dir. 

1802 basedir: Optional argument that specifies an alternative to the 

1803 control dir. 

1804 Returns: An open file object, or None if the file does not exist. 

1805 """ 

1806 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1807 # the dumb web serving code. 

1808 if basedir is None: 

1809 basedir = self.controldir() 

1810 if isinstance(path, bytes): 

1811 path = path.decode("utf-8") 

1812 path = path.lstrip(os.path.sep) 

1813 try: 

1814 return open(os.path.join(basedir, path), "rb") 

1815 except FileNotFoundError: 

1816 return None 

1817 

1818 def index_path(self) -> str: 

1819 """Return path to the index file.""" 

1820 return os.path.join(self.controldir(), INDEX_FILENAME) 

1821 

1822 def open_index(self, config: "Config | None" = None) -> "Index": 

1823 """Open the index for this repository. 

1824 

1825 Args: 

1826 config: Configuration to consult for index settings. If None, 

1827 falls back to ``self.get_config_stack()``. 

1828 

1829 Raises: 

1830 NoIndexPresent: If no index is present 

1831 Returns: The matching `Index` 

1832 """ 

1833 from .index import Index, make_path_normalizer 

1834 

1835 if not self.has_index(): 

1836 raise NoIndexPresent 

1837 

1838 if config is None: 

1839 config = self.get_config_stack() 

1840 many_files = config.get_boolean(b"feature", b"manyFiles", False) 

1841 skip_hash = False 

1842 index_version = None 

1843 

1844 if many_files: 

1845 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true 

1846 try: 

1847 index_version_str = config.get(b"index", b"version") 

1848 index_version = int(index_version_str) 

1849 except KeyError: 

1850 index_version = 4 # Default to version 4 for manyFiles 

1851 skip_hash = config.get_boolean(b"index", b"skipHash", True) 

1852 else: 

1853 # Check for explicit index settings 

1854 try: 

1855 index_version_str = config.get(b"index", b"version") 

1856 index_version = int(index_version_str) 

1857 except KeyError: 

1858 index_version = None 

1859 skip_hash = config.get_boolean(b"index", b"skipHash", False) 

1860 

1861 # Get shared repository permissions for index file 

1862 file_mode, _ = self._get_shared_repository_permissions() 

1863 

1864 return Index( 

1865 self.index_path(), 

1866 skip_hash=skip_hash, 

1867 version=index_version, 

1868 file_mode=file_mode, 

1869 path_normalizer=make_path_normalizer(config), 

1870 ) 

1871 

1872 def has_index(self) -> bool: 

1873 """Check if an index is present.""" 

1874 # Bare repos must never have index files; non-bare repos may have a 

1875 # missing index file, which is treated as empty. 

1876 return not self.bare 

1877 

1878 def clone( 

1879 self, 

1880 target_path: str | bytes | os.PathLike[str], 

1881 *, 

1882 mkdir: bool = True, 

1883 bare: bool = False, 

1884 origin: bytes = b"origin", 

1885 checkout: bool | None = None, 

1886 branch: bytes | None = None, 

1887 progress: Callable[[str], None] | None = None, 

1888 depth: int | None = None, 

1889 symlinks: bool | None = None, 

1890 ) -> "Repo": 

1891 """Clone this repository. 

1892 

1893 Args: 

1894 target_path: Target path 

1895 mkdir: Create the target directory 

1896 bare: Whether to create a bare repository 

1897 checkout: Whether or not to check-out HEAD after cloning 

1898 origin: Base name for refs in target repository 

1899 cloned from this repository 

1900 branch: Optional branch or tag to be used as HEAD in the new repository 

1901 instead of this repository's HEAD. 

1902 progress: Optional progress function 

1903 depth: Depth at which to fetch 

1904 symlinks: Symlinks setting (default to autodetect) 

1905 Returns: Created repository as `Repo` 

1906 """ 

1907 encoded_path = os.fsencode(self.path) 

1908 

1909 if mkdir: 

1910 os.mkdir(target_path) 

1911 

1912 try: 

1913 if not bare: 

1914 target = Repo.init(target_path, symlinks=symlinks) 

1915 if checkout is None: 

1916 checkout = True 

1917 else: 

1918 if checkout: 

1919 raise ValueError("checkout and bare are incompatible") 

1920 target = Repo.init_bare(target_path) 

1921 

1922 try: 

1923 target_config = target.get_config() 

1924 target_config.set((b"remote", origin), b"url", encoded_path) 

1925 target_config.set( 

1926 (b"remote", origin), 

1927 b"fetch", 

1928 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1929 ) 

1930 target_config.write_to_path() 

1931 

1932 ref_message = b"clone: from " + encoded_path 

1933 self.fetch(target, depth=depth) 

1934 target.refs.import_refs( 

1935 Ref(b"refs/remotes/" + origin), 

1936 self.refs.as_dict(Ref(b"refs/heads")), 

1937 message=ref_message, 

1938 ) 

1939 target.refs.import_refs( 

1940 Ref(b"refs/tags"), 

1941 self.refs.as_dict(Ref(b"refs/tags")), 

1942 message=ref_message, 

1943 ) 

1944 

1945 head_chain, origin_sha = self.refs.follow(HEADREF) 

1946 origin_head = head_chain[-1] if head_chain else None 

1947 if origin_sha and not origin_head: 

1948 # set detached HEAD 

1949 target.refs[HEADREF] = origin_sha 

1950 else: 

1951 _set_origin_head(target.refs, origin, origin_head) 

1952 head_ref = _set_default_branch( 

1953 target.refs, origin, origin_head, branch, ref_message 

1954 ) 

1955 

1956 # Update target head 

1957 if head_ref: 

1958 head = _set_head(target.refs, head_ref, ref_message) 

1959 else: 

1960 head = None 

1961 

1962 if checkout and head is not None: 

1963 target.get_worktree().reset_index(config=target.get_config_stack()) 

1964 except BaseException: 

1965 target.close() 

1966 raise 

1967 except BaseException: 

1968 if mkdir: 

1969 import shutil 

1970 

1971 shutil.rmtree(target_path) 

1972 raise 

1973 return target 

1974 

1975 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]: 

1976 """Get condition matchers for includeIf conditions. 

1977 

1978 Returns a dict of condition prefix to matcher function. 

1979 """ 

1980 from pathlib import Path 

1981 

1982 from .config import ConditionMatcher, match_glob_pattern 

1983 

1984 # Add gitdir matchers 

1985 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool: 

1986 """Match gitdir against a pattern. 

1987 

1988 Args: 

1989 pattern: Pattern to match against 

1990 case_sensitive: Whether to match case-sensitively 

1991 

1992 Returns: 

1993 True if gitdir matches pattern 

1994 """ 

1995 # Handle relative patterns (starting with ./) 

1996 if pattern.startswith("./"): 

1997 # Can't handle relative patterns without config directory context 

1998 return False 

1999 

2000 # Normalize repository path 

2001 try: 

2002 repo_path = str(Path(self._controldir).resolve()) 

2003 except (OSError, ValueError): 

2004 return False 

2005 

2006 # Expand ~ in pattern and normalize 

2007 pattern = os.path.expanduser(pattern) 

2008 

2009 # Normalize pattern following Git's rules 

2010 pattern = pattern.replace("\\", "/") 

2011 if not pattern.startswith(("~/", "./", "/", "**")): 

2012 # Check for Windows absolute path 

2013 if len(pattern) >= 2 and pattern[1] == ":": 

2014 pass 

2015 else: 

2016 pattern = "**/" + pattern 

2017 if pattern.endswith("/"): 

2018 pattern = pattern + "**" 

2019 

2020 # Use the existing _match_gitdir_pattern function 

2021 from .config import _match_gitdir_pattern 

2022 

2023 pattern_bytes = pattern.encode("utf-8", errors="replace") 

2024 repo_path_bytes = repo_path.encode("utf-8", errors="replace") 

2025 

2026 return _match_gitdir_pattern( 

2027 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive 

2028 ) 

2029 

2030 # Add onbranch matcher 

2031 def match_onbranch(pattern: str) -> bool: 

2032 """Match current branch against a pattern. 

2033 

2034 Args: 

2035 pattern: Pattern to match against 

2036 

2037 Returns: 

2038 True if current branch matches pattern 

2039 """ 

2040 try: 

2041 # Get the current branch using refs 

2042 ref_chain, _ = self.refs.follow(HEADREF) 

2043 head_ref = ref_chain[-1] # Get the final resolved ref 

2044 except KeyError: 

2045 pass 

2046 else: 

2047 if head_ref and head_ref.startswith(b"refs/heads/"): 

2048 # Extract branch name from ref 

2049 branch = extract_branch_name(head_ref).decode( 

2050 "utf-8", errors="replace" 

2051 ) 

2052 return match_glob_pattern(branch, pattern) 

2053 return False 

2054 

2055 matchers: dict[str, ConditionMatcher] = { 

2056 "onbranch:": match_onbranch, 

2057 "gitdir:": lambda pattern: match_gitdir(pattern, True), 

2058 "gitdir/i:": lambda pattern: match_gitdir(pattern, False), 

2059 } 

2060 

2061 return matchers 

2062 

2063 def get_worktree_config(self) -> "ConfigFile": 

2064 """Get the worktree-specific config. 

2065 

2066 Returns: 

2067 ConfigFile object for the worktree config 

2068 """ 

2069 from .config import ConfigFile 

2070 

2071 path = os.path.join(self.commondir(), "config.worktree") 

2072 try: 

2073 # Pass condition matchers for includeIf evaluation 

2074 condition_matchers = self._get_config_condition_matchers() 

2075 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

2076 except FileNotFoundError: 

2077 cf = ConfigFile() 

2078 cf.path = path 

2079 return cf 

2080 

2081 def get_config(self) -> "ConfigFile": 

2082 """Retrieve the config object. 

2083 

2084 Returns: `ConfigFile` object for the ``.git/config`` file. 

2085 """ 

2086 from .config import ConfigFile 

2087 

2088 path = os.path.join(self._commondir, "config") 

2089 try: 

2090 # Pass condition matchers for includeIf evaluation 

2091 condition_matchers = self._get_config_condition_matchers() 

2092 return ConfigFile.from_path(path, condition_matchers=condition_matchers) 

2093 except FileNotFoundError: 

2094 ret = ConfigFile() 

2095 ret.path = path 

2096 return ret 

2097 

2098 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2099 """Get the appropriate rebase state manager for this repository. 

2100 

2101 Returns: DiskRebaseStateManager instance 

2102 """ 

2103 import os 

2104 

2105 from .rebase import DiskRebaseStateManager 

2106 

2107 path = os.path.join(self.controldir(), "rebase-merge") 

2108 return DiskRebaseStateManager(path) 

2109 

2110 def get_description(self) -> bytes | None: 

2111 """Retrieve the description of this repository. 

2112 

2113 Returns: Description as bytes or None. 

2114 """ 

2115 path = os.path.join(self._controldir, "description") 

2116 try: 

2117 with GitFile(path, "rb") as f: 

2118 return f.read() 

2119 except FileNotFoundError: 

2120 return None 

2121 

2122 def __repr__(self) -> str: 

2123 """Return string representation of this repository.""" 

2124 return f"<Repo at {self.path!r}>" 

2125 

2126 def set_description(self, description: bytes) -> None: 

2127 """Set the description for this repository. 

2128 

2129 Args: 

2130 description: Text to set as description for this repository. 

2131 """ 

2132 self._put_named_file("description", description) 

2133 

2134 @classmethod 

2135 def _init_maybe_bare( 

2136 cls, 

2137 path: str | bytes | os.PathLike[str], 

2138 controldir: str | bytes | os.PathLike[str], 

2139 bare: bool, 

2140 object_store: PackBasedObjectStore | None = None, 

2141 config: "StackedConfig | None" = None, 

2142 default_branch: bytes | None = None, 

2143 symlinks: bool | None = None, 

2144 format: int | None = None, 

2145 shared_repository: str | bool | None = None, 

2146 object_format: str | None = None, 

2147 ) -> "Repo": 

2148 path = os.fspath(path) 

2149 if isinstance(path, bytes): 

2150 path = os.fsdecode(path) 

2151 controldir = os.fspath(controldir) 

2152 if isinstance(controldir, bytes): 

2153 controldir = os.fsdecode(controldir) 

2154 

2155 # Determine shared repository permissions early 

2156 file_mode: int | None = None 

2157 dir_mode: int | None = None 

2158 if shared_repository is not None: 

2159 file_mode, dir_mode = parse_shared_repository(shared_repository) 

2160 

2161 # Create base directories with appropriate permissions 

2162 for d in BASE_DIRECTORIES: 

2163 dir_path = os.path.join(controldir, *d) 

2164 os.mkdir(dir_path) 

2165 if dir_mode is not None: 

2166 os.chmod(dir_path, dir_mode) 

2167 

2168 # Determine hash algorithm 

2169 from .object_format import get_object_format 

2170 

2171 hash_alg = get_object_format(object_format) 

2172 

2173 if object_store is None: 

2174 object_store = DiskObjectStore.init( 

2175 os.path.join(controldir, OBJECTDIR), 

2176 file_mode=file_mode, 

2177 dir_mode=dir_mode, 

2178 object_format=hash_alg, 

2179 ) 

2180 ret = cls(path, bare=bare, object_store=object_store) 

2181 if default_branch is None: 

2182 if config is None: 

2183 from .config import StackedConfig 

2184 

2185 config = StackedConfig.default() 

2186 try: 

2187 default_branch = config.get("init", "defaultBranch") 

2188 except KeyError: 

2189 default_branch = DEFAULT_BRANCH 

2190 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch)) 

2191 ret._init_files( 

2192 bare=bare, 

2193 symlinks=symlinks, 

2194 format=format, 

2195 shared_repository=shared_repository, 

2196 object_format=object_format, 

2197 ) 

2198 return ret 

2199 

2200 @classmethod 

2201 def init( 

2202 cls, 

2203 path: str | bytes | os.PathLike[str], 

2204 *, 

2205 mkdir: bool = False, 

2206 config: "StackedConfig | None" = None, 

2207 default_branch: bytes | None = None, 

2208 symlinks: bool | None = None, 

2209 format: int | None = None, 

2210 shared_repository: str | bool | None = None, 

2211 object_format: str | None = None, 

2212 ) -> "Repo": 

2213 """Create a new repository. 

2214 

2215 Args: 

2216 path: Path in which to create the repository 

2217 mkdir: Whether to create the directory 

2218 config: Configuration object 

2219 default_branch: Default branch name 

2220 symlinks: Whether to support symlinks 

2221 format: Repository format version (defaults to 0) 

2222 shared_repository: Shared repository setting (group, all, umask, or octal) 

2223 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1") 

2224 Returns: `Repo` instance 

2225 """ 

2226 path = os.fspath(path) 

2227 if isinstance(path, bytes): 

2228 path = os.fsdecode(path) 

2229 if mkdir: 

2230 os.mkdir(path) 

2231 controldir = os.path.join(path, CONTROLDIR) 

2232 os.mkdir(controldir) 

2233 _set_filesystem_hidden(controldir) 

2234 return cls._init_maybe_bare( 

2235 path, 

2236 controldir, 

2237 False, 

2238 config=config, 

2239 default_branch=default_branch, 

2240 symlinks=symlinks, 

2241 format=format, 

2242 shared_repository=shared_repository, 

2243 object_format=object_format, 

2244 ) 

2245 

2246 @classmethod 

2247 def _init_new_working_directory( 

2248 cls, 

2249 path: str | bytes | os.PathLike[str], 

2250 main_repo: "Repo", 

2251 identifier: str | None = None, 

2252 mkdir: bool = False, 

2253 relative_paths: bool = False, 

2254 ) -> "Repo": 

2255 """Create a new working directory linked to a repository. 

2256 

2257 Args: 

2258 path: Path in which to create the working tree. 

2259 main_repo: Main repository to reference 

2260 identifier: Worktree identifier 

2261 mkdir: Whether to create the directory 

2262 relative_paths: Whether to use relative paths for gitdir references 

2263 Returns: `Repo` instance 

2264 """ 

2265 path = os.fspath(path) 

2266 if isinstance(path, bytes): 

2267 path = os.fsdecode(path) 

2268 if mkdir: 

2269 os.mkdir(path) 

2270 if identifier is None: 

2271 identifier = os.path.basename(path) 

2272 # Ensure we use absolute path for the worktree control directory 

2273 main_controldir = os.path.abspath(main_repo.controldir()) 

2274 main_worktreesdir = os.path.join(main_controldir, WORKTREES) 

2275 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

2276 gitdirfile_abs = os.path.abspath(os.path.join(path, CONTROLDIR)) 

2277 

2278 # Write gitdir reference in .git file (can be relative) 

2279 # Import helper from worktree module to avoid duplication 

2280 from .worktree import _compute_gitdir_path 

2281 

2282 gitdir_ref = _compute_gitdir_path( 

2283 main_repo, 

2284 worktree_controldir, 

2285 os.path.dirname(gitdirfile_abs), 

2286 relative_paths, 

2287 ) 

2288 

2289 with open(gitdirfile_abs, "wb") as f: 

2290 f.write(b"gitdir: " + os.fsencode(gitdir_ref) + b"\n") 

2291 

2292 # Get shared repository permissions from main repository 

2293 _, dir_mode = main_repo._get_shared_repository_permissions() 

2294 

2295 # Create directories with appropriate permissions 

2296 try: 

2297 os.mkdir(main_worktreesdir) 

2298 if dir_mode is not None: 

2299 os.chmod(main_worktreesdir, dir_mode) 

2300 except FileExistsError: 

2301 pass 

2302 try: 

2303 os.mkdir(worktree_controldir) 

2304 if dir_mode is not None: 

2305 os.chmod(worktree_controldir, dir_mode) 

2306 except FileExistsError: 

2307 pass 

2308 

2309 # Write gitdir path in control directory (can be relative) 

2310 gitdir_path = _compute_gitdir_path( 

2311 main_repo, gitdirfile_abs, worktree_controldir, relative_paths 

2312 ) 

2313 

2314 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

2315 f.write(os.fsencode(gitdir_path) + b"\n") 

2316 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

2317 f.write(b"../..\n") 

2318 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

2319 f.write(main_repo.head() + b"\n") 

2320 r = cls(os.path.normpath(path)) 

2321 r.get_worktree().reset_index(config=r.get_config_stack()) 

2322 return r 

2323 

2324 @classmethod 

2325 def init_bare( 

2326 cls, 

2327 path: str | bytes | os.PathLike[str], 

2328 *, 

2329 mkdir: bool = False, 

2330 object_store: PackBasedObjectStore | None = None, 

2331 config: "StackedConfig | None" = None, 

2332 default_branch: bytes | None = None, 

2333 format: int | None = None, 

2334 shared_repository: str | bool | None = None, 

2335 object_format: str | None = None, 

2336 ) -> "Repo": 

2337 """Create a new bare repository. 

2338 

2339 ``path`` should already exist and be an empty directory. 

2340 

2341 Args: 

2342 path: Path to create bare repository in 

2343 mkdir: Whether to create the directory 

2344 object_store: Object store to use 

2345 config: Configuration object 

2346 default_branch: Default branch name 

2347 format: Repository format version (defaults to 0) 

2348 shared_repository: Shared repository setting (group, all, umask, or octal) 

2349 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1") 

2350 Returns: a `Repo` instance 

2351 """ 

2352 path = os.fspath(path) 

2353 if isinstance(path, bytes): 

2354 path = os.fsdecode(path) 

2355 if mkdir: 

2356 os.mkdir(path) 

2357 return cls._init_maybe_bare( 

2358 path, 

2359 path, 

2360 True, 

2361 object_store=object_store, 

2362 config=config, 

2363 default_branch=default_branch, 

2364 format=format, 

2365 shared_repository=shared_repository, 

2366 object_format=object_format, 

2367 ) 

2368 

2369 create = init_bare 

2370 

2371 def close(self) -> None: 

2372 """Close any files opened by this repository.""" 

2373 self.object_store.close() 

2374 # Clean up filter context if it was created 

2375 if self.filter_context is not None: 

2376 self.filter_context.close() 

2377 self.filter_context = None 

2378 

2379 def __enter__(self) -> Self: 

2380 """Enter context manager.""" 

2381 return self 

2382 

2383 def __exit__( 

2384 self, 

2385 exc_type: type[BaseException] | None, 

2386 exc_val: BaseException | None, 

2387 exc_tb: TracebackType | None, 

2388 ) -> None: 

2389 """Exit context manager and close repository.""" 

2390 self.close() 

2391 

2392 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]: 

2393 """Read .gitattributes file from working tree. 

2394 

2395 Returns: 

2396 Dictionary mapping file patterns to attributes 

2397 """ 

2398 gitattributes = {} 

2399 gitattributes_path = os.path.join(self.path, ".gitattributes") 

2400 

2401 if os.path.exists(gitattributes_path): 

2402 with open(gitattributes_path, "rb") as f: 

2403 for line in f: 

2404 line = line.strip() 

2405 if not line or line.startswith(b"#"): 

2406 continue 

2407 

2408 parts = line.split() 

2409 if len(parts) < 2: 

2410 continue 

2411 

2412 pattern = parts[0] 

2413 attrs = {} 

2414 

2415 for attr in parts[1:]: 

2416 if attr.startswith(b"-"): 

2417 # Unset attribute 

2418 attrs[attr[1:]] = b"false" 

2419 elif b"=" in attr: 

2420 # Set to value 

2421 key, value = attr.split(b"=", 1) 

2422 attrs[key] = value 

2423 else: 

2424 # Set attribute 

2425 attrs[attr] = b"true" 

2426 

2427 gitattributes[pattern] = attrs 

2428 

2429 return gitattributes 

2430 

2431 def get_blob_normalizer( 

2432 self, config: "Config | None" = None 

2433 ) -> "FilterBlobNormalizer": 

2434 """Return a BlobNormalizer object. 

2435 

2436 Args: 

2437 config: Configuration to consult for filter setup. If None, 

2438 falls back to ``self.get_config_stack()``. 

2439 """ 

2440 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2441 

2442 if config is None: 

2443 config = self.get_config_stack() 

2444 git_attributes = self.get_gitattributes() 

2445 

2446 # Lazily create FilterContext if needed 

2447 if self.filter_context is None: 

2448 filter_registry = FilterRegistry(config, self) 

2449 self.filter_context = FilterContext(filter_registry) 

2450 else: 

2451 # Refresh the context with current config to handle config changes 

2452 self.filter_context.refresh_config(config) 

2453 

2454 return FilterBlobNormalizer( 

2455 config, git_attributes, filter_context=self.filter_context 

2456 ) 

2457 

2458 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2459 """Read gitattributes for the repository. 

2460 

2461 Args: 

2462 tree: Tree SHA to read .gitattributes from (defaults to HEAD) 

2463 

2464 Returns: 

2465 GitAttributes object that can be used to match paths 

2466 """ 

2467 from .attrs import ( 

2468 GitAttributes, 

2469 Pattern, 

2470 parse_git_attributes, 

2471 ) 

2472 

2473 patterns = [] 

2474 

2475 # Read system gitattributes (TODO: implement this) 

2476 # Read global gitattributes (TODO: implement this) 

2477 

2478 # Read repository .gitattributes from index/tree 

2479 if tree is None: 

2480 try: 

2481 # Try to get from HEAD 

2482 head = self[b"HEAD"] 

2483 # Peel tags to get to the underlying commit 

2484 while isinstance(head, Tag): 

2485 _cls, obj = head.object 

2486 head = self.get_object(obj) 

2487 if not isinstance(head, Commit): 

2488 raise ValueError( 

2489 f"Expected HEAD to point to a Commit, got {type(head).__name__}. " 

2490 f"This usually means HEAD points to a {type(head).__name__} object " 

2491 f"instead of a Commit." 

2492 ) 

2493 tree = head.tree 

2494 except KeyError: 

2495 # No HEAD, no attributes from tree 

2496 pass 

2497 

2498 if tree is not None: 

2499 try: 

2500 tree_obj = self[tree] 

2501 assert isinstance(tree_obj, Tree) 

2502 if b".gitattributes" in tree_obj: 

2503 _, attrs_sha = tree_obj[b".gitattributes"] 

2504 attrs_blob = self[attrs_sha] 

2505 if isinstance(attrs_blob, Blob): 

2506 attrs_data = BytesIO(attrs_blob.data) 

2507 for pattern_bytes, attrs in parse_git_attributes(attrs_data): 

2508 pattern = Pattern(pattern_bytes) 

2509 patterns.append((pattern, attrs)) 

2510 except (KeyError, NotTreeError): 

2511 pass 

2512 

2513 # Read .git/info/attributes 

2514 info_attrs_path = os.path.join(self.controldir(), "info", "attributes") 

2515 if os.path.exists(info_attrs_path): 

2516 with open(info_attrs_path, "rb") as f: 

2517 for pattern_bytes, attrs in parse_git_attributes(f): 

2518 pattern = Pattern(pattern_bytes) 

2519 patterns.append((pattern, attrs)) 

2520 

2521 # Read .gitattributes from working directory (if it exists) 

2522 working_attrs_path = os.path.join(self.path, ".gitattributes") 

2523 if os.path.exists(working_attrs_path): 

2524 with open(working_attrs_path, "rb") as f: 

2525 for pattern_bytes, attrs in parse_git_attributes(f): 

2526 pattern = Pattern(pattern_bytes) 

2527 patterns.append((pattern, attrs)) 

2528 

2529 return GitAttributes(patterns) 

2530 

2531 

2532class MemoryRepo(BaseRepo): 

2533 """Repo that stores refs, objects, and named files in memory. 

2534 

2535 MemoryRepos are always bare: they have no working tree and no index, since 

2536 those have a stronger dependency on the filesystem. 

2537 """ 

2538 

2539 filter_context: "FilterContext | None" 

2540 

2541 def __init__(self) -> None: 

2542 """Create a new repository in memory.""" 

2543 from .config import ConfigFile 

2544 from .object_format import DEFAULT_OBJECT_FORMAT 

2545 

2546 self._reflog: list[Any] = [] 

2547 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

2548 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) 

2549 self._named_files: dict[str, bytes] = {} 

2550 self.bare = True 

2551 self._config = ConfigFile() 

2552 self._description: bytes | None = None 

2553 self.filter_context = None 

2554 # MemoryRepo defaults to default object format 

2555 self.object_format = DEFAULT_OBJECT_FORMAT 

2556 

2557 def _append_reflog( 

2558 self, 

2559 ref: bytes, 

2560 old_sha: bytes | None, 

2561 new_sha: bytes | None, 

2562 committer: bytes | None, 

2563 timestamp: int | None, 

2564 timezone: int | None, 

2565 message: bytes | None, 

2566 ) -> None: 

2567 self._reflog.append( 

2568 (ref, old_sha, new_sha, committer, timestamp, timezone, message) 

2569 ) 

2570 

2571 def set_description(self, description: bytes) -> None: 

2572 """Set the description for this repository. 

2573 

2574 Args: 

2575 description: Text to set as description 

2576 """ 

2577 self._description = description 

2578 

2579 def get_description(self) -> bytes | None: 

2580 """Get the description of this repository. 

2581 

2582 Returns: 

2583 Repository description as bytes 

2584 """ 

2585 return self._description 

2586 

2587 def _determine_file_mode(self) -> bool: 

2588 """Probe the file-system to determine whether permissions can be trusted. 

2589 

2590 Returns: True if permissions can be trusted, False otherwise. 

2591 """ 

2592 return sys.platform != "win32" 

2593 

2594 def _determine_symlinks(self) -> bool: 

2595 """Probe the file-system to determine whether permissions can be trusted. 

2596 

2597 Returns: True if permissions can be trusted, False otherwise. 

2598 """ 

2599 return sys.platform != "win32" 

2600 

2601 def _put_named_file(self, path: str, contents: bytes) -> None: 

2602 """Write a file to the control dir with the given name and contents. 

2603 

2604 Args: 

2605 path: The path to the file, relative to the control dir. 

2606 contents: A string to write to the file. 

2607 """ 

2608 self._named_files[path] = contents 

2609 

2610 def _del_named_file(self, path: str) -> None: 

2611 try: 

2612 del self._named_files[path] 

2613 except KeyError: 

2614 pass 

2615 

2616 def get_named_file( 

2617 self, 

2618 path: str | bytes, 

2619 basedir: str | None = None, 

2620 ) -> BytesIO | None: 

2621 """Get a file from the control dir with a specific name. 

2622 

2623 Although the filename should be interpreted as a filename relative to 

2624 the control dir in a disk-baked Repo, the object returned need not be 

2625 pointing to a file in that location. 

2626 

2627 Args: 

2628 path: The path to the file, relative to the control dir. 

2629 basedir: Optional base directory for the path 

2630 Returns: An open file object, or None if the file does not exist. 

2631 """ 

2632 path_str = path.decode() if isinstance(path, bytes) else path 

2633 contents = self._named_files.get(path_str, None) 

2634 if contents is None: 

2635 return None 

2636 return BytesIO(contents) 

2637 

2638 def open_index(self, config: "Config | None" = None) -> "Index": 

2639 """Fail to open index for this repo, since it is bare. 

2640 

2641 Args: 

2642 config: Unused; kept for signature compatibility with ``BaseRepo``. 

2643 

2644 Raises: 

2645 NoIndexPresent: Raised when no index is present 

2646 """ 

2647 raise NoIndexPresent 

2648 

2649 def _init_config(self, config: "ConfigFile") -> None: 

2650 """Initialize repository configuration for MemoryRepo.""" 

2651 self._config = config 

2652 

2653 def get_config(self) -> "ConfigFile": 

2654 """Retrieve the config object. 

2655 

2656 Returns: `ConfigFile` object. 

2657 """ 

2658 return self._config 

2659 

2660 def get_rebase_state_manager(self) -> "RebaseStateManager": 

2661 """Get the appropriate rebase state manager for this repository. 

2662 

2663 Returns: MemoryRebaseStateManager instance 

2664 """ 

2665 from .rebase import MemoryRebaseStateManager 

2666 

2667 return MemoryRebaseStateManager(self) 

2668 

2669 def get_blob_normalizer( 

2670 self, config: "Config | None" = None 

2671 ) -> "FilterBlobNormalizer": 

2672 """Return a BlobNormalizer object for checkin/checkout operations. 

2673 

2674 Args: 

2675 config: Configuration to consult for filter setup. If None, 

2676 falls back to ``self.get_config_stack()``. 

2677 """ 

2678 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry 

2679 

2680 if config is None: 

2681 config = self.get_config_stack() 

2682 git_attributes = self.get_gitattributes() 

2683 

2684 if self.filter_context is None: 

2685 filter_registry = FilterRegistry(config, self) 

2686 self.filter_context = FilterContext(filter_registry) 

2687 else: 

2688 self.filter_context.refresh_config(config) 

2689 

2690 return FilterBlobNormalizer( 

2691 config, git_attributes, filter_context=self.filter_context 

2692 ) 

2693 

2694 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes": 

2695 """Read gitattributes for the repository.""" 

2696 from .attrs import GitAttributes 

2697 

2698 # Memory repos don't have working trees or gitattributes files 

2699 # Return empty GitAttributes 

2700 return GitAttributes([]) 

2701 

2702 def close(self) -> None: 

2703 """Close any resources opened by this repository.""" 

2704 # Clean up filter context if it was created 

2705 if self.filter_context is not None: 

2706 self.filter_context.close() 

2707 self.filter_context = None 

2708 # Close object store to release pack files 

2709 self.object_store.close() 

2710 

2711 def do_commit( 

2712 self, 

2713 message: bytes | None = None, 

2714 committer: bytes | None = None, 

2715 author: bytes | None = None, 

2716 commit_timestamp: float | None = None, 

2717 commit_timezone: int | None = None, 

2718 author_timestamp: float | None = None, 

2719 author_timezone: int | None = None, 

2720 tree: ObjectID | None = None, 

2721 encoding: bytes | None = None, 

2722 ref: Ref | None = HEADREF, 

2723 merge_heads: list[ObjectID] | None = None, 

2724 no_verify: bool = False, 

2725 sign: bool = False, 

2726 config: "Config | None" = None, 

2727 ) -> bytes: 

2728 """Create a new commit. 

2729 

2730 This is a simplified implementation for in-memory repositories that 

2731 doesn't support worktree operations or hooks. 

2732 

2733 Args: 

2734 message: Commit message 

2735 committer: Committer fullname 

2736 author: Author fullname 

2737 commit_timestamp: Commit timestamp (defaults to now) 

2738 commit_timezone: Commit timestamp timezone (defaults to GMT) 

2739 author_timestamp: Author timestamp (defaults to commit timestamp) 

2740 author_timezone: Author timestamp timezone (defaults to commit timezone) 

2741 tree: SHA1 of the tree root to use 

2742 encoding: Encoding 

2743 ref: Optional ref to commit to (defaults to current branch). 

2744 If None, creates a dangling commit without updating any ref. 

2745 merge_heads: Merge heads 

2746 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo) 

2747 sign: GPG Sign the commit (ignored for MemoryRepo) 

2748 config: Configuration to consult for committer/author identity. If 

2749 None, falls back to ``self.get_config_stack()``. 

2750 

2751 Returns: 

2752 New commit SHA1 

2753 """ 

2754 import time 

2755 

2756 from .objects import Commit 

2757 

2758 if tree is None: 

2759 raise ValueError("tree must be specified for MemoryRepo") 

2760 

2761 c = Commit() 

2762 if len(tree) != self.object_format.hex_length: 

2763 raise ValueError( 

2764 f"tree must be a {self.object_format.hex_length}-character hex sha string" 

2765 ) 

2766 c.tree = tree 

2767 

2768 if config is None: 

2769 config = self.get_config_stack() 

2770 if merge_heads is None: 

2771 merge_heads = [] 

2772 if committer is None: 

2773 committer = get_user_identity(config, kind="COMMITTER") 

2774 check_user_identity(committer) 

2775 c.committer = committer 

2776 if commit_timestamp is None: 

2777 commit_timestamp = time.time() 

2778 c.commit_time = int(commit_timestamp) 

2779 if commit_timezone is None: 

2780 commit_timezone = 0 

2781 c.commit_timezone = commit_timezone 

2782 if author is None: 

2783 author = get_user_identity(config, kind="AUTHOR") 

2784 c.author = author 

2785 check_user_identity(author) 

2786 if author_timestamp is None: 

2787 author_timestamp = commit_timestamp 

2788 c.author_time = int(author_timestamp) 

2789 if author_timezone is None: 

2790 author_timezone = commit_timezone 

2791 c.author_timezone = author_timezone 

2792 if encoding is None: 

2793 try: 

2794 encoding = config.get(("i18n",), "commitEncoding") 

2795 except KeyError: 

2796 pass 

2797 if encoding is not None: 

2798 c.encoding = encoding 

2799 

2800 # Handle message (for MemoryRepo, we don't support callable messages) 

2801 if callable(message): 

2802 message = message(self, c) 

2803 if message is None: 

2804 raise ValueError("Message callback returned None") 

2805 

2806 if message is None: 

2807 raise ValueError("No commit message specified") 

2808 

2809 c.message = message 

2810 

2811 if ref is None: 

2812 # Create a dangling commit 

2813 c.parents = merge_heads 

2814 self.object_store.add_object(c) 

2815 else: 

2816 try: 

2817 old_head = self.refs[ref] 

2818 c.parents = [old_head, *merge_heads] 

2819 self.object_store.add_object(c) 

2820 ok = self.refs.set_if_equals( 

2821 ref, 

2822 old_head, 

2823 c.id, 

2824 message=b"commit: " + message, 

2825 committer=committer, 

2826 timestamp=int(commit_timestamp), 

2827 timezone=commit_timezone, 

2828 ) 

2829 except KeyError: 

2830 c.parents = merge_heads 

2831 self.object_store.add_object(c) 

2832 ok = self.refs.add_if_new( 

2833 ref, 

2834 c.id, 

2835 message=b"commit: " + message, 

2836 committer=committer, 

2837 timestamp=int(commit_timestamp), 

2838 timezone=commit_timezone, 

2839 ) 

2840 if not ok: 

2841 from .errors import CommitError 

2842 

2843 raise CommitError(f"{ref!r} changed during commit") 

2844 

2845 return c.id 

2846 

2847 @classmethod 

2848 def init_bare( 

2849 cls, 

2850 objects: Iterable[ShaFile], 

2851 refs: Mapping[Ref, ObjectID], 

2852 format: int | None = None, 

2853 object_format: str | None = None, 

2854 ) -> "MemoryRepo": 

2855 """Create a new bare repository in memory. 

2856 

2857 Args: 

2858 objects: Objects for the new repository, 

2859 as iterable 

2860 refs: Refs as dictionary, mapping names 

2861 to object SHA1s 

2862 format: Repository format version (defaults to 0) 

2863 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1") 

2864 """ 

2865 ret = cls() 

2866 for obj in objects: 

2867 ret.object_store.add_object(obj) 

2868 for refname, sha in refs.items(): 

2869 ret.refs.add_if_new(refname, sha) 

2870 ret._init_files(bare=True, format=format, object_format=object_format) 

2871 return ret