Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/repo.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

839 statements  

1# repo.py -- For dealing with git repositories. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Repository access. 

24 

25This module contains the base class for git repositories 

26(BaseRepo) and an implementation which uses a repository on 

27local disk (Repo). 

28 

29""" 

30 

31import os 

32import stat 

33import sys 

34import time 

35import warnings 

36from io import BytesIO 

37from typing import ( 

38 TYPE_CHECKING, 

39 Any, 

40 BinaryIO, 

41 Callable, 

42 Dict, 

43 FrozenSet, 

44 Iterable, 

45 List, 

46 Optional, 

47 Set, 

48 Tuple, 

49 Union, 

50) 

51 

52if TYPE_CHECKING: 

53 # There are no circular imports here, but we try to defer imports as long 

54 # as possible to reduce start-up time for anything that doesn't need 

55 # these imports. 

56 from .config import ConfigFile, StackedConfig 

57 from .index import Index 

58 

59from .errors import ( 

60 CommitError, 

61 HookError, 

62 NoIndexPresent, 

63 NotBlobError, 

64 NotCommitError, 

65 NotGitRepository, 

66 NotTagError, 

67 NotTreeError, 

68 RefFormatError, 

69) 

70from .file import GitFile 

71from .hooks import ( 

72 CommitMsgShellHook, 

73 Hook, 

74 PostCommitShellHook, 

75 PostReceiveShellHook, 

76 PreCommitShellHook, 

77) 

78from .line_ending import BlobNormalizer, TreeBlobNormalizer 

79from .object_store import ( 

80 DiskObjectStore, 

81 MemoryObjectStore, 

82 MissingObjectFinder, 

83 ObjectStoreGraphWalker, 

84 PackBasedObjectStore, 

85 peel_sha, 

86) 

87from .objects import ( 

88 Blob, 

89 Commit, 

90 ObjectID, 

91 ShaFile, 

92 Tag, 

93 Tree, 

94 check_hexsha, 

95 valid_hexsha, 

96) 

97from .pack import generate_unpacked_objects 

98from .refs import ( 

99 ANNOTATED_TAG_SUFFIX, # noqa: F401 

100 LOCAL_BRANCH_PREFIX, 

101 LOCAL_TAG_PREFIX, # noqa: F401 

102 SYMREF, # noqa: F401 

103 DictRefsContainer, 

104 DiskRefsContainer, 

105 InfoRefsContainer, # noqa: F401 

106 Ref, 

107 RefsContainer, 

108 _set_default_branch, 

109 _set_head, 

110 _set_origin_head, 

111 check_ref_format, # noqa: F401 

112 read_packed_refs, # noqa: F401 

113 read_packed_refs_with_peeled, # noqa: F401 

114 serialize_refs, 

115 write_packed_refs, # noqa: F401 

116) 

117 

118CONTROLDIR = ".git" 

119OBJECTDIR = "objects" 

120REFSDIR = "refs" 

121REFSDIR_TAGS = "tags" 

122REFSDIR_HEADS = "heads" 

123INDEX_FILENAME = "index" 

124COMMONDIR = "commondir" 

125GITDIR = "gitdir" 

126WORKTREES = "worktrees" 

127 

128BASE_DIRECTORIES = [ 

129 ["branches"], 

130 [REFSDIR], 

131 [REFSDIR, REFSDIR_TAGS], 

132 [REFSDIR, REFSDIR_HEADS], 

133 ["hooks"], 

134 ["info"], 

135] 

136 

137DEFAULT_BRANCH = b"master" 

138 

139 

140class InvalidUserIdentity(Exception): 

141 """User identity is not of the format 'user <email>'.""" 

142 

143 def __init__(self, identity) -> None: 

144 self.identity = identity 

145 

146 

147class DefaultIdentityNotFound(Exception): 

148 """Default identity could not be determined.""" 

149 

150 

151# TODO(jelmer): Cache? 

152def _get_default_identity() -> Tuple[str, str]: 

153 import socket 

154 

155 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"): 

156 username = os.environ.get(name) 

157 if username: 

158 break 

159 else: 

160 username = None 

161 

162 try: 

163 import pwd 

164 except ImportError: 

165 fullname = None 

166 else: 

167 try: 

168 entry = pwd.getpwuid(os.getuid()) # type: ignore 

169 except KeyError: 

170 fullname = None 

171 else: 

172 if getattr(entry, "gecos", None): 

173 fullname = entry.pw_gecos.split(",")[0] 

174 else: 

175 fullname = None 

176 if username is None: 

177 username = entry.pw_name 

178 if not fullname: 

179 if username is None: 

180 raise DefaultIdentityNotFound("no username found") 

181 fullname = username 

182 email = os.environ.get("EMAIL") 

183 if email is None: 

184 if username is None: 

185 raise DefaultIdentityNotFound("no username found") 

186 email = f"{username}@{socket.gethostname()}" 

187 return (fullname, email) 

188 

189 

190def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes: 

191 """Determine the identity to use for new commits. 

192 

193 If kind is set, this first checks 

194 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL. 

195 

196 If those variables are not set, then it will fall back 

197 to reading the user.name and user.email settings from 

198 the specified configuration. 

199 

200 If that also fails, then it will fall back to using 

201 the current users' identity as obtained from the host 

202 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f). 

203 

204 Args: 

205 kind: Optional kind to return identity for, 

206 usually either "AUTHOR" or "COMMITTER". 

207 

208 Returns: 

209 A user identity 

210 """ 

211 user: Optional[bytes] = None 

212 email: Optional[bytes] = None 

213 if kind: 

214 user_uc = os.environ.get("GIT_" + kind + "_NAME") 

215 if user_uc is not None: 

216 user = user_uc.encode("utf-8") 

217 email_uc = os.environ.get("GIT_" + kind + "_EMAIL") 

218 if email_uc is not None: 

219 email = email_uc.encode("utf-8") 

220 if user is None: 

221 try: 

222 user = config.get(("user",), "name") 

223 except KeyError: 

224 user = None 

225 if email is None: 

226 try: 

227 email = config.get(("user",), "email") 

228 except KeyError: 

229 email = None 

230 default_user, default_email = _get_default_identity() 

231 if user is None: 

232 user = default_user.encode("utf-8") 

233 if email is None: 

234 email = default_email.encode("utf-8") 

235 if email.startswith(b"<") and email.endswith(b">"): 

236 email = email[1:-1] 

237 return user + b" <" + email + b">" 

238 

239 

240def check_user_identity(identity): 

241 """Verify that a user identity is formatted correctly. 

242 

243 Args: 

244 identity: User identity bytestring 

245 Raises: 

246 InvalidUserIdentity: Raised when identity is invalid 

247 """ 

248 try: 

249 fst, snd = identity.split(b" <", 1) 

250 except ValueError as exc: 

251 raise InvalidUserIdentity(identity) from exc 

252 if b">" not in snd: 

253 raise InvalidUserIdentity(identity) 

254 if b"\0" in identity or b"\n" in identity: 

255 raise InvalidUserIdentity(identity) 

256 

257 

258def parse_graftpoints( 

259 graftpoints: Iterable[bytes], 

260) -> Dict[bytes, List[bytes]]: 

261 """Convert a list of graftpoints into a dict. 

262 

263 Args: 

264 graftpoints: Iterator of graftpoint lines 

265 

266 Each line is formatted as: 

267 <commit sha1> <parent sha1> [<parent sha1>]* 

268 

269 Resulting dictionary is: 

270 <commit sha1>: [<parent sha1>*] 

271 

272 https://git.wiki.kernel.org/index.php/GraftPoint 

273 """ 

274 grafts = {} 

275 for line in graftpoints: 

276 raw_graft = line.split(None, 1) 

277 

278 commit = raw_graft[0] 

279 if len(raw_graft) == 2: 

280 parents = raw_graft[1].split() 

281 else: 

282 parents = [] 

283 

284 for sha in [commit, *parents]: 

285 check_hexsha(sha, "Invalid graftpoint") 

286 

287 grafts[commit] = parents 

288 return grafts 

289 

290 

291def serialize_graftpoints(graftpoints: Dict[bytes, List[bytes]]) -> bytes: 

292 """Convert a dictionary of grafts into string. 

293 

294 The graft dictionary is: 

295 <commit sha1>: [<parent sha1>*] 

296 

297 Each line is formatted as: 

298 <commit sha1> <parent sha1> [<parent sha1>]* 

299 

300 https://git.wiki.kernel.org/index.php/GraftPoint 

301 

302 """ 

303 graft_lines = [] 

304 for commit, parents in graftpoints.items(): 

305 if parents: 

306 graft_lines.append(commit + b" " + b" ".join(parents)) 

307 else: 

308 graft_lines.append(commit) 

309 return b"\n".join(graft_lines) 

310 

311 

312def _set_filesystem_hidden(path): 

313 """Mark path as to be hidden if supported by platform and filesystem. 

314 

315 On win32 uses SetFileAttributesW api: 

316 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw> 

317 """ 

318 if sys.platform == "win32": 

319 import ctypes 

320 from ctypes.wintypes import BOOL, DWORD, LPCWSTR 

321 

322 FILE_ATTRIBUTE_HIDDEN = 2 

323 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)( 

324 ("SetFileAttributesW", ctypes.windll.kernel32) 

325 ) 

326 

327 if isinstance(path, bytes): 

328 path = os.fsdecode(path) 

329 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN): 

330 pass # Could raise or log `ctypes.WinError()` here 

331 

332 # Could implement other platform specific filesystem hiding here 

333 

334 

335class ParentsProvider: 

336 def __init__(self, store, grafts={}, shallows=[]) -> None: 

337 self.store = store 

338 self.grafts = grafts 

339 self.shallows = set(shallows) 

340 

341 def get_parents(self, commit_id, commit=None): 

342 try: 

343 return self.grafts[commit_id] 

344 except KeyError: 

345 pass 

346 if commit_id in self.shallows: 

347 return [] 

348 if commit is None: 

349 commit = self.store[commit_id] 

350 return commit.parents 

351 

352 

353class BaseRepo: 

354 """Base class for a git repository. 

355 

356 This base class is meant to be used for Repository implementations that e.g. 

357 work on top of a different transport than a standard filesystem path. 

358 

359 Attributes: 

360 object_store: Dictionary-like object for accessing 

361 the objects 

362 refs: Dictionary-like object with the refs in this 

363 repository 

364 """ 

365 

366 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None: 

367 """Open a repository. 

368 

369 This shouldn't be called directly, but rather through one of the 

370 base classes, such as MemoryRepo or Repo. 

371 

372 Args: 

373 object_store: Object store to use 

374 refs: Refs container to use 

375 """ 

376 self.object_store = object_store 

377 self.refs = refs 

378 

379 self._graftpoints: Dict[bytes, List[bytes]] = {} 

380 self.hooks: Dict[str, Hook] = {} 

381 

382 def _determine_file_mode(self) -> bool: 

383 """Probe the file-system to determine whether permissions can be trusted. 

384 

385 Returns: True if permissions can be trusted, False otherwise. 

386 """ 

387 raise NotImplementedError(self._determine_file_mode) 

388 

389 def _determine_symlinks(self) -> bool: 

390 """Probe the filesystem to determine whether symlinks can be created. 

391 

392 Returns: True if symlinks can be created, False otherwise. 

393 """ 

394 # For now, just mimic the old behaviour 

395 return sys.platform != "win32" 

396 

397 def _init_files(self, bare: bool, symlinks: Optional[bool] = None) -> None: 

398 """Initialize a default set of named files.""" 

399 from .config import ConfigFile 

400 

401 self._put_named_file("description", b"Unnamed repository") 

402 f = BytesIO() 

403 cf = ConfigFile() 

404 cf.set("core", "repositoryformatversion", "0") 

405 if self._determine_file_mode(): 

406 cf.set("core", "filemode", True) 

407 else: 

408 cf.set("core", "filemode", False) 

409 

410 if symlinks is None and not bare: 

411 symlinks = self._determine_symlinks() 

412 

413 if symlinks is False: 

414 cf.set("core", "symlinks", symlinks) 

415 

416 cf.set("core", "bare", bare) 

417 cf.set("core", "logallrefupdates", True) 

418 cf.write_to_file(f) 

419 self._put_named_file("config", f.getvalue()) 

420 self._put_named_file(os.path.join("info", "exclude"), b"") 

421 

422 def get_named_file(self, path: str) -> Optional[BinaryIO]: 

423 """Get a file from the control dir with a specific name. 

424 

425 Although the filename should be interpreted as a filename relative to 

426 the control dir in a disk-based Repo, the object returned need not be 

427 pointing to a file in that location. 

428 

429 Args: 

430 path: The path to the file, relative to the control dir. 

431 Returns: An open file object, or None if the file does not exist. 

432 """ 

433 raise NotImplementedError(self.get_named_file) 

434 

435 def _put_named_file(self, path: str, contents: bytes): 

436 """Write a file to the control dir with the given name and contents. 

437 

438 Args: 

439 path: The path to the file, relative to the control dir. 

440 contents: A string to write to the file. 

441 """ 

442 raise NotImplementedError(self._put_named_file) 

443 

444 def _del_named_file(self, path: str): 

445 """Delete a file in the control directory with the given name.""" 

446 raise NotImplementedError(self._del_named_file) 

447 

448 def open_index(self) -> "Index": 

449 """Open the index for this repository. 

450 

451 Raises: 

452 NoIndexPresent: If no index is present 

453 Returns: The matching `Index` 

454 """ 

455 raise NotImplementedError(self.open_index) 

456 

457 def fetch(self, target, determine_wants=None, progress=None, depth=None): 

458 """Fetch objects into another repository. 

459 

460 Args: 

461 target: The target repository 

462 determine_wants: Optional function to determine what refs to 

463 fetch. 

464 progress: Optional progress function 

465 depth: Optional shallow fetch depth 

466 Returns: The local refs 

467 """ 

468 if determine_wants is None: 

469 determine_wants = target.object_store.determine_wants_all 

470 count, pack_data = self.fetch_pack_data( 

471 determine_wants, 

472 target.get_graph_walker(), 

473 progress=progress, 

474 depth=depth, 

475 ) 

476 target.object_store.add_pack_data(count, pack_data, progress) 

477 return self.get_refs() 

478 

479 def fetch_pack_data( 

480 self, 

481 determine_wants, 

482 graph_walker, 

483 progress, 

484 get_tagged=None, 

485 depth=None, 

486 ): 

487 """Fetch the pack data required for a set of revisions. 

488 

489 Args: 

490 determine_wants: Function that takes a dictionary with heads 

491 and returns the list of heads to fetch. 

492 graph_walker: Object that can iterate over the list of revisions 

493 to fetch and has an "ack" method that will be called to acknowledge 

494 that a revision is present. 

495 progress: Simple progress function that will be called with 

496 updated progress strings. 

497 get_tagged: Function that returns a dict of pointed-to sha -> 

498 tag sha for including tags. 

499 depth: Shallow fetch depth 

500 Returns: count and iterator over pack data 

501 """ 

502 missing_objects = self.find_missing_objects( 

503 determine_wants, graph_walker, progress, get_tagged, depth=depth 

504 ) 

505 remote_has = missing_objects.get_remote_has() 

506 object_ids = list(missing_objects) 

507 return len(object_ids), generate_unpacked_objects( 

508 self.object_store, object_ids, progress=progress, other_haves=remote_has 

509 ) 

510 

511 def find_missing_objects( 

512 self, 

513 determine_wants, 

514 graph_walker, 

515 progress, 

516 get_tagged=None, 

517 depth=None, 

518 ) -> Optional[MissingObjectFinder]: 

519 """Fetch the missing objects required for a set of revisions. 

520 

521 Args: 

522 determine_wants: Function that takes a dictionary with heads 

523 and returns the list of heads to fetch. 

524 graph_walker: Object that can iterate over the list of revisions 

525 to fetch and has an "ack" method that will be called to acknowledge 

526 that a revision is present. 

527 progress: Simple progress function that will be called with 

528 updated progress strings. 

529 get_tagged: Function that returns a dict of pointed-to sha -> 

530 tag sha for including tags. 

531 depth: Shallow fetch depth 

532 Returns: iterator over objects, with __len__ implemented 

533 """ 

534 if depth not in (None, 0): 

535 raise NotImplementedError("depth not supported yet") 

536 

537 refs = serialize_refs(self.object_store, self.get_refs()) 

538 

539 wants = determine_wants(refs) 

540 if not isinstance(wants, list): 

541 raise TypeError("determine_wants() did not return a list") 

542 

543 shallows: FrozenSet[ObjectID] = getattr(graph_walker, "shallow", frozenset()) 

544 unshallows: FrozenSet[ObjectID] = getattr( 

545 graph_walker, "unshallow", frozenset() 

546 ) 

547 

548 if wants == []: 

549 # TODO(dborowitz): find a way to short-circuit that doesn't change 

550 # this interface. 

551 

552 if shallows or unshallows: 

553 # Do not send a pack in shallow short-circuit path 

554 return None 

555 

556 class DummyMissingObjectFinder: 

557 def get_remote_has(self): 

558 return None 

559 

560 def __len__(self) -> int: 

561 return 0 

562 

563 def __iter__(self): 

564 yield from [] 

565 

566 return DummyMissingObjectFinder() # type: ignore 

567 

568 # If the graph walker is set up with an implementation that can 

569 # ACK/NAK to the wire, it will write data to the client through 

570 # this call as a side-effect. 

571 haves = self.object_store.find_common_revisions(graph_walker) 

572 

573 # Deal with shallow requests separately because the haves do 

574 # not reflect what objects are missing 

575 if shallows or unshallows: 

576 # TODO: filter the haves commits from iter_shas. the specific 

577 # commits aren't missing. 

578 haves = [] 

579 

580 parents_provider = ParentsProvider(self.object_store, shallows=shallows) 

581 

582 def get_parents(commit): 

583 return parents_provider.get_parents(commit.id, commit) 

584 

585 return MissingObjectFinder( 

586 self.object_store, 

587 haves=haves, 

588 wants=wants, 

589 shallow=self.get_shallow(), 

590 progress=progress, 

591 get_tagged=get_tagged, 

592 get_parents=get_parents, 

593 ) 

594 

595 def generate_pack_data( 

596 self, 

597 have: List[ObjectID], 

598 want: List[ObjectID], 

599 progress: Optional[Callable[[str], None]] = None, 

600 ofs_delta: Optional[bool] = None, 

601 ): 

602 """Generate pack data objects for a set of wants/haves. 

603 

604 Args: 

605 have: List of SHA1s of objects that should not be sent 

606 want: List of SHA1s of objects that should be sent 

607 ofs_delta: Whether OFS deltas can be included 

608 progress: Optional progress reporting method 

609 """ 

610 return self.object_store.generate_pack_data( 

611 have, 

612 want, 

613 shallow=self.get_shallow(), 

614 progress=progress, 

615 ofs_delta=ofs_delta, 

616 ) 

617 

618 def get_graph_walker( 

619 self, heads: Optional[List[ObjectID]] = None 

620 ) -> ObjectStoreGraphWalker: 

621 """Retrieve a graph walker. 

622 

623 A graph walker is used by a remote repository (or proxy) 

624 to find out which objects are present in this repository. 

625 

626 Args: 

627 heads: Repository heads to use (optional) 

628 Returns: A graph walker object 

629 """ 

630 if heads is None: 

631 heads = [ 

632 sha 

633 for sha in self.refs.as_dict(b"refs/heads").values() 

634 if sha in self.object_store 

635 ] 

636 parents_provider = ParentsProvider(self.object_store) 

637 return ObjectStoreGraphWalker( 

638 heads, parents_provider.get_parents, shallow=self.get_shallow() 

639 ) 

640 

641 def get_refs(self) -> Dict[bytes, bytes]: 

642 """Get dictionary with all refs. 

643 

644 Returns: A ``dict`` mapping ref names to SHA1s 

645 """ 

646 return self.refs.as_dict() 

647 

648 def head(self) -> bytes: 

649 """Return the SHA1 pointed at by HEAD.""" 

650 return self.refs[b"HEAD"] 

651 

652 def _get_object(self, sha, cls): 

653 assert len(sha) in (20, 40) 

654 ret = self.get_object(sha) 

655 if not isinstance(ret, cls): 

656 if cls is Commit: 

657 raise NotCommitError(ret) 

658 elif cls is Blob: 

659 raise NotBlobError(ret) 

660 elif cls is Tree: 

661 raise NotTreeError(ret) 

662 elif cls is Tag: 

663 raise NotTagError(ret) 

664 else: 

665 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}") 

666 return ret 

667 

668 def get_object(self, sha: bytes) -> ShaFile: 

669 """Retrieve the object with the specified SHA. 

670 

671 Args: 

672 sha: SHA to retrieve 

673 Returns: A ShaFile object 

674 Raises: 

675 KeyError: when the object can not be found 

676 """ 

677 return self.object_store[sha] 

678 

679 def parents_provider(self) -> ParentsProvider: 

680 return ParentsProvider( 

681 self.object_store, 

682 grafts=self._graftpoints, 

683 shallows=self.get_shallow(), 

684 ) 

685 

686 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> List[bytes]: 

687 """Retrieve the parents of a specific commit. 

688 

689 If the specific commit is a graftpoint, the graft parents 

690 will be returned instead. 

691 

692 Args: 

693 sha: SHA of the commit for which to retrieve the parents 

694 commit: Optional commit matching the sha 

695 Returns: List of parents 

696 """ 

697 return self.parents_provider().get_parents(sha, commit) 

698 

699 def get_config(self) -> "ConfigFile": 

700 """Retrieve the config object. 

701 

702 Returns: `ConfigFile` object for the ``.git/config`` file. 

703 """ 

704 raise NotImplementedError(self.get_config) 

705 

706 def get_worktree_config(self) -> "ConfigFile": 

707 """Retrieve the worktree config object.""" 

708 raise NotImplementedError(self.get_worktree_config) 

709 

710 def get_description(self): 

711 """Retrieve the description for this repository. 

712 

713 Returns: String with the description of the repository 

714 as set by the user. 

715 """ 

716 raise NotImplementedError(self.get_description) 

717 

718 def set_description(self, description): 

719 """Set the description for this repository. 

720 

721 Args: 

722 description: Text to set as description for this repository. 

723 """ 

724 raise NotImplementedError(self.set_description) 

725 

726 def get_config_stack(self) -> "StackedConfig": 

727 """Return a config stack for this repository. 

728 

729 This stack accesses the configuration for both this repository 

730 itself (.git/config) and the global configuration, which usually 

731 lives in ~/.gitconfig. 

732 

733 Returns: `Config` instance for this repository 

734 """ 

735 from .config import ConfigFile, StackedConfig 

736 

737 local_config = self.get_config() 

738 backends: List[ConfigFile] = [local_config] 

739 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False): 

740 backends.append(self.get_worktree_config()) 

741 

742 backends += StackedConfig.default_backends() 

743 return StackedConfig(backends, writable=local_config) 

744 

745 def get_shallow(self) -> Set[ObjectID]: 

746 """Get the set of shallow commits. 

747 

748 Returns: Set of shallow commits. 

749 """ 

750 f = self.get_named_file("shallow") 

751 if f is None: 

752 return set() 

753 with f: 

754 return {line.strip() for line in f} 

755 

756 def update_shallow(self, new_shallow, new_unshallow): 

757 """Update the list of shallow objects. 

758 

759 Args: 

760 new_shallow: Newly shallow objects 

761 new_unshallow: Newly no longer shallow objects 

762 """ 

763 shallow = self.get_shallow() 

764 if new_shallow: 

765 shallow.update(new_shallow) 

766 if new_unshallow: 

767 shallow.difference_update(new_unshallow) 

768 if shallow: 

769 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow])) 

770 else: 

771 self._del_named_file("shallow") 

772 

773 def get_peeled(self, ref: Ref) -> ObjectID: 

774 """Get the peeled value of a ref. 

775 

776 Args: 

777 ref: The refname to peel. 

778 Returns: The fully-peeled SHA1 of a tag object, after peeling all 

779 intermediate tags; if the original ref does not point to a tag, 

780 this will equal the original SHA1. 

781 """ 

782 cached = self.refs.get_peeled(ref) 

783 if cached is not None: 

784 return cached 

785 return peel_sha(self.object_store, self.refs[ref])[1].id 

786 

787 def get_walker(self, include: Optional[List[bytes]] = None, *args, **kwargs): 

788 """Obtain a walker for this repository. 

789 

790 Args: 

791 include: Iterable of SHAs of commits to include along with their 

792 ancestors. Defaults to [HEAD] 

793 exclude: Iterable of SHAs of commits to exclude along with their 

794 ancestors, overriding includes. 

795 order: ORDER_* constant specifying the order of results. 

796 Anything other than ORDER_DATE may result in O(n) memory usage. 

797 reverse: If True, reverse the order of output, requiring O(n) 

798 memory. 

799 max_entries: The maximum number of entries to yield, or None for 

800 no limit. 

801 paths: Iterable of file or subtree paths to show entries for. 

802 rename_detector: diff.RenameDetector object for detecting 

803 renames. 

804 follow: If True, follow path across renames/copies. Forces a 

805 default rename_detector. 

806 since: Timestamp to list commits after. 

807 until: Timestamp to list commits before. 

808 queue_cls: A class to use for a queue of commits, supporting the 

809 iterator protocol. The constructor takes a single argument, the 

810 Walker. 

811 Returns: A `Walker` object 

812 """ 

813 from .walk import Walker 

814 

815 if include is None: 

816 include = [self.head()] 

817 

818 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit) 

819 

820 return Walker(self.object_store, include, *args, **kwargs) 

821 

822 def __getitem__(self, name: Union[ObjectID, Ref]): 

823 """Retrieve a Git object by SHA1 or ref. 

824 

825 Args: 

826 name: A Git object SHA1 or a ref name 

827 Returns: A `ShaFile` object, such as a Commit or Blob 

828 Raises: 

829 KeyError: when the specified ref or object does not exist 

830 """ 

831 if not isinstance(name, bytes): 

832 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}") 

833 if len(name) in (20, 40): 

834 try: 

835 return self.object_store[name] 

836 except (KeyError, ValueError): 

837 pass 

838 try: 

839 return self.object_store[self.refs[name]] 

840 except RefFormatError as exc: 

841 raise KeyError(name) from exc 

842 

843 def __contains__(self, name: bytes) -> bool: 

844 """Check if a specific Git object or ref is present. 

845 

846 Args: 

847 name: Git object SHA1 or ref name 

848 """ 

849 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)): 

850 return name in self.object_store or name in self.refs 

851 else: 

852 return name in self.refs 

853 

854 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None: 

855 """Set a ref. 

856 

857 Args: 

858 name: ref name 

859 value: Ref value - either a ShaFile object, or a hex sha 

860 """ 

861 if name.startswith(b"refs/") or name == b"HEAD": 

862 if isinstance(value, ShaFile): 

863 self.refs[name] = value.id 

864 elif isinstance(value, bytes): 

865 self.refs[name] = value 

866 else: 

867 raise TypeError(value) 

868 else: 

869 raise ValueError(name) 

870 

871 def __delitem__(self, name: bytes) -> None: 

872 """Remove a ref. 

873 

874 Args: 

875 name: Name of the ref to remove 

876 """ 

877 if name.startswith(b"refs/") or name == b"HEAD": 

878 del self.refs[name] 

879 else: 

880 raise ValueError(name) 

881 

882 def _get_user_identity( 

883 self, config: "StackedConfig", kind: Optional[str] = None 

884 ) -> bytes: 

885 """Determine the identity to use for new commits.""" 

886 warnings.warn( 

887 "use get_user_identity() rather than Repo._get_user_identity", 

888 DeprecationWarning, 

889 ) 

890 return get_user_identity(config) 

891 

892 def _add_graftpoints(self, updated_graftpoints: Dict[bytes, List[bytes]]): 

893 """Add or modify graftpoints. 

894 

895 Args: 

896 updated_graftpoints: Dict of commit shas to list of parent shas 

897 """ 

898 # Simple validation 

899 for commit, parents in updated_graftpoints.items(): 

900 for sha in [commit, *parents]: 

901 check_hexsha(sha, "Invalid graftpoint") 

902 

903 self._graftpoints.update(updated_graftpoints) 

904 

905 def _remove_graftpoints(self, to_remove: List[bytes] = []) -> None: 

906 """Remove graftpoints. 

907 

908 Args: 

909 to_remove: List of commit shas 

910 """ 

911 for sha in to_remove: 

912 del self._graftpoints[sha] 

913 

914 def _read_heads(self, name): 

915 f = self.get_named_file(name) 

916 if f is None: 

917 return [] 

918 with f: 

919 return [line.strip() for line in f.readlines() if line.strip()] 

920 

921 def do_commit( 

922 self, 

923 message: Optional[bytes] = None, 

924 committer: Optional[bytes] = None, 

925 author: Optional[bytes] = None, 

926 commit_timestamp=None, 

927 commit_timezone=None, 

928 author_timestamp=None, 

929 author_timezone=None, 

930 tree: Optional[ObjectID] = None, 

931 encoding: Optional[bytes] = None, 

932 ref: Ref = b"HEAD", 

933 merge_heads: Optional[List[ObjectID]] = None, 

934 no_verify: bool = False, 

935 sign: bool = False, 

936 ): 

937 """Create a new commit. 

938 

939 If not specified, committer and author default to 

940 get_user_identity(..., 'COMMITTER') 

941 and get_user_identity(..., 'AUTHOR') respectively. 

942 

943 Args: 

944 message: Commit message 

945 committer: Committer fullname 

946 author: Author fullname 

947 commit_timestamp: Commit timestamp (defaults to now) 

948 commit_timezone: Commit timestamp timezone (defaults to GMT) 

949 author_timestamp: Author timestamp (defaults to commit 

950 timestamp) 

951 author_timezone: Author timestamp timezone 

952 (defaults to commit timestamp timezone) 

953 tree: SHA1 of the tree root to use (if not specified the 

954 current index will be committed). 

955 encoding: Encoding 

956 ref: Optional ref to commit to (defaults to current branch) 

957 merge_heads: Merge heads (defaults to .git/MERGE_HEAD) 

958 no_verify: Skip pre-commit and commit-msg hooks 

959 sign: GPG Sign the commit (bool, defaults to False, 

960 pass True to use default GPG key, 

961 pass a str containing Key ID to use a specific GPG key) 

962 

963 Returns: 

964 New commit SHA1 

965 """ 

966 try: 

967 if not no_verify: 

968 self.hooks["pre-commit"].execute() 

969 except HookError as exc: 

970 raise CommitError(exc) from exc 

971 except KeyError: # no hook defined, silent fallthrough 

972 pass 

973 

974 c = Commit() 

975 if tree is None: 

976 index = self.open_index() 

977 c.tree = index.commit(self.object_store) 

978 else: 

979 if len(tree) != 40: 

980 raise ValueError("tree must be a 40-byte hex sha string") 

981 c.tree = tree 

982 

983 config = self.get_config_stack() 

984 if merge_heads is None: 

985 merge_heads = self._read_heads("MERGE_HEAD") 

986 if committer is None: 

987 committer = get_user_identity(config, kind="COMMITTER") 

988 check_user_identity(committer) 

989 c.committer = committer 

990 if commit_timestamp is None: 

991 # FIXME: Support GIT_COMMITTER_DATE environment variable 

992 commit_timestamp = time.time() 

993 c.commit_time = int(commit_timestamp) 

994 if commit_timezone is None: 

995 # FIXME: Use current user timezone rather than UTC 

996 commit_timezone = 0 

997 c.commit_timezone = commit_timezone 

998 if author is None: 

999 author = get_user_identity(config, kind="AUTHOR") 

1000 c.author = author 

1001 check_user_identity(author) 

1002 if author_timestamp is None: 

1003 # FIXME: Support GIT_AUTHOR_DATE environment variable 

1004 author_timestamp = commit_timestamp 

1005 c.author_time = int(author_timestamp) 

1006 if author_timezone is None: 

1007 author_timezone = commit_timezone 

1008 c.author_timezone = author_timezone 

1009 if encoding is None: 

1010 try: 

1011 encoding = config.get(("i18n",), "commitEncoding") 

1012 except KeyError: 

1013 pass # No dice 

1014 if encoding is not None: 

1015 c.encoding = encoding 

1016 if message is None: 

1017 # FIXME: Try to read commit message from .git/MERGE_MSG 

1018 raise ValueError("No commit message specified") 

1019 

1020 try: 

1021 if no_verify: 

1022 c.message = message 

1023 else: 

1024 c.message = self.hooks["commit-msg"].execute(message) 

1025 if c.message is None: 

1026 c.message = message 

1027 except HookError as exc: 

1028 raise CommitError(exc) from exc 

1029 except KeyError: # no hook defined, message not modified 

1030 c.message = message 

1031 

1032 keyid = sign if isinstance(sign, str) else None 

1033 

1034 if ref is None: 

1035 # Create a dangling commit 

1036 c.parents = merge_heads 

1037 if sign: 

1038 c.sign(keyid) 

1039 self.object_store.add_object(c) 

1040 else: 

1041 try: 

1042 old_head = self.refs[ref] 

1043 c.parents = [old_head, *merge_heads] 

1044 if sign: 

1045 c.sign(keyid) 

1046 self.object_store.add_object(c) 

1047 ok = self.refs.set_if_equals( 

1048 ref, 

1049 old_head, 

1050 c.id, 

1051 message=b"commit: " + message, 

1052 committer=committer, 

1053 timestamp=commit_timestamp, 

1054 timezone=commit_timezone, 

1055 ) 

1056 except KeyError: 

1057 c.parents = merge_heads 

1058 if sign: 

1059 c.sign(keyid) 

1060 self.object_store.add_object(c) 

1061 ok = self.refs.add_if_new( 

1062 ref, 

1063 c.id, 

1064 message=b"commit: " + message, 

1065 committer=committer, 

1066 timestamp=commit_timestamp, 

1067 timezone=commit_timezone, 

1068 ) 

1069 if not ok: 

1070 # Fail if the atomic compare-and-swap failed, leaving the 

1071 # commit and all its objects as garbage. 

1072 raise CommitError(f"{ref!r} changed during commit") 

1073 

1074 self._del_named_file("MERGE_HEAD") 

1075 

1076 try: 

1077 self.hooks["post-commit"].execute() 

1078 except HookError as e: # silent failure 

1079 warnings.warn(f"post-commit hook failed: {e}", UserWarning) 

1080 except KeyError: # no hook defined, silent fallthrough 

1081 pass 

1082 

1083 return c.id 

1084 

1085 

1086def read_gitfile(f): 

1087 """Read a ``.git`` file. 

1088 

1089 The first line of the file should start with "gitdir: " 

1090 

1091 Args: 

1092 f: File-like object to read from 

1093 Returns: A path 

1094 """ 

1095 cs = f.read() 

1096 if not cs.startswith("gitdir: "): 

1097 raise ValueError("Expected file to start with 'gitdir: '") 

1098 return cs[len("gitdir: ") :].rstrip("\n") 

1099 

1100 

1101class UnsupportedVersion(Exception): 

1102 """Unsupported repository version.""" 

1103 

1104 def __init__(self, version) -> None: 

1105 self.version = version 

1106 

1107 

1108class UnsupportedExtension(Exception): 

1109 """Unsupported repository extension.""" 

1110 

1111 def __init__(self, extension) -> None: 

1112 self.extension = extension 

1113 

1114 

1115class Repo(BaseRepo): 

1116 """A git repository backed by local disk. 

1117 

1118 To open an existing repository, call the constructor with 

1119 the path of the repository. 

1120 

1121 To create a new repository, use the Repo.init class method. 

1122 

1123 Note that a repository object may hold on to resources such 

1124 as file handles for performance reasons; call .close() to free 

1125 up those resources. 

1126 

1127 Attributes: 

1128 path: Path to the working copy (if it exists) or repository control 

1129 directory (if the repository is bare) 

1130 bare: Whether this is a bare repository 

1131 """ 

1132 

1133 path: str 

1134 bare: bool 

1135 

1136 def __init__( 

1137 self, 

1138 root: str, 

1139 object_store: Optional[PackBasedObjectStore] = None, 

1140 bare: Optional[bool] = None, 

1141 ) -> None: 

1142 hidden_path = os.path.join(root, CONTROLDIR) 

1143 if bare is None: 

1144 if os.path.isfile(hidden_path) or os.path.isdir( 

1145 os.path.join(hidden_path, OBJECTDIR) 

1146 ): 

1147 bare = False 

1148 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir( 

1149 os.path.join(root, REFSDIR) 

1150 ): 

1151 bare = True 

1152 else: 

1153 raise NotGitRepository( 

1154 "No git repository was found at {path}".format(**dict(path=root)) 

1155 ) 

1156 

1157 self.bare = bare 

1158 if bare is False: 

1159 if os.path.isfile(hidden_path): 

1160 with open(hidden_path) as f: 

1161 path = read_gitfile(f) 

1162 self._controldir = os.path.join(root, path) 

1163 else: 

1164 self._controldir = hidden_path 

1165 else: 

1166 self._controldir = root 

1167 commondir = self.get_named_file(COMMONDIR) 

1168 if commondir is not None: 

1169 with commondir: 

1170 self._commondir = os.path.join( 

1171 self.controldir(), 

1172 os.fsdecode(commondir.read().rstrip(b"\r\n")), 

1173 ) 

1174 else: 

1175 self._commondir = self._controldir 

1176 self.path = root 

1177 config = self.get_config() 

1178 try: 

1179 repository_format_version = config.get("core", "repositoryformatversion") 

1180 format_version = ( 

1181 0 

1182 if repository_format_version is None 

1183 else int(repository_format_version) 

1184 ) 

1185 except KeyError: 

1186 format_version = 0 

1187 

1188 if format_version not in (0, 1): 

1189 raise UnsupportedVersion(format_version) 

1190 

1191 for extension, _value in config.items((b"extensions",)): 

1192 if extension.lower() not in (b"worktreeconfig",): 

1193 raise UnsupportedExtension(extension) 

1194 

1195 if object_store is None: 

1196 object_store = DiskObjectStore.from_config( 

1197 os.path.join(self.commondir(), OBJECTDIR), config 

1198 ) 

1199 refs = DiskRefsContainer( 

1200 self.commondir(), self._controldir, logger=self._write_reflog 

1201 ) 

1202 BaseRepo.__init__(self, object_store, refs) 

1203 

1204 self._graftpoints = {} 

1205 graft_file = self.get_named_file( 

1206 os.path.join("info", "grafts"), basedir=self.commondir() 

1207 ) 

1208 if graft_file: 

1209 with graft_file: 

1210 self._graftpoints.update(parse_graftpoints(graft_file)) 

1211 graft_file = self.get_named_file("shallow", basedir=self.commondir()) 

1212 if graft_file: 

1213 with graft_file: 

1214 self._graftpoints.update(parse_graftpoints(graft_file)) 

1215 

1216 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir()) 

1217 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir()) 

1218 self.hooks["post-commit"] = PostCommitShellHook(self.controldir()) 

1219 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir()) 

1220 

1221 def _write_reflog( 

1222 self, ref, old_sha, new_sha, committer, timestamp, timezone, message 

1223 ): 

1224 from .reflog import format_reflog_line 

1225 

1226 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref)) 

1227 try: 

1228 os.makedirs(os.path.dirname(path)) 

1229 except FileExistsError: 

1230 pass 

1231 if committer is None: 

1232 config = self.get_config_stack() 

1233 committer = self._get_user_identity(config) 

1234 check_user_identity(committer) 

1235 if timestamp is None: 

1236 timestamp = int(time.time()) 

1237 if timezone is None: 

1238 timezone = 0 # FIXME 

1239 with open(path, "ab") as f: 

1240 f.write( 

1241 format_reflog_line( 

1242 old_sha, new_sha, committer, timestamp, timezone, message 

1243 ) 

1244 + b"\n" 

1245 ) 

1246 

1247 @classmethod 

1248 def discover(cls, start="."): 

1249 """Iterate parent directories to discover a repository. 

1250 

1251 Return a Repo object for the first parent directory that looks like a 

1252 Git repository. 

1253 

1254 Args: 

1255 start: The directory to start discovery from (defaults to '.') 

1256 """ 

1257 remaining = True 

1258 path = os.path.abspath(start) 

1259 while remaining: 

1260 try: 

1261 return cls(path) 

1262 except NotGitRepository: 

1263 path, remaining = os.path.split(path) 

1264 raise NotGitRepository( 

1265 "No git repository was found at {path}".format(**dict(path=start)) 

1266 ) 

1267 

1268 def controldir(self): 

1269 """Return the path of the control directory.""" 

1270 return self._controldir 

1271 

1272 def commondir(self): 

1273 """Return the path of the common directory. 

1274 

1275 For a main working tree, it is identical to controldir(). 

1276 

1277 For a linked working tree, it is the control directory of the 

1278 main working tree. 

1279 """ 

1280 return self._commondir 

1281 

1282 def _determine_file_mode(self): 

1283 """Probe the file-system to determine whether permissions can be trusted. 

1284 

1285 Returns: True if permissions can be trusted, False otherwise. 

1286 """ 

1287 fname = os.path.join(self.path, ".probe-permissions") 

1288 with open(fname, "w") as f: 

1289 f.write("") 

1290 

1291 st1 = os.lstat(fname) 

1292 try: 

1293 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) 

1294 except PermissionError: 

1295 return False 

1296 st2 = os.lstat(fname) 

1297 

1298 os.unlink(fname) 

1299 

1300 mode_differs = st1.st_mode != st2.st_mode 

1301 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 

1302 

1303 return mode_differs and st2_has_exec 

1304 

1305 def _determine_symlinks(self): 

1306 """Probe the filesystem to determine whether symlinks can be created. 

1307 

1308 Returns: True if symlinks can be created, False otherwise. 

1309 """ 

1310 # TODO(jelmer): Actually probe disk / look at filesystem 

1311 return sys.platform != "win32" 

1312 

1313 def _put_named_file(self, path, contents): 

1314 """Write a file to the control dir with the given name and contents. 

1315 

1316 Args: 

1317 path: The path to the file, relative to the control dir. 

1318 contents: A string to write to the file. 

1319 """ 

1320 path = path.lstrip(os.path.sep) 

1321 with GitFile(os.path.join(self.controldir(), path), "wb") as f: 

1322 f.write(contents) 

1323 

1324 def _del_named_file(self, path): 

1325 try: 

1326 os.unlink(os.path.join(self.controldir(), path)) 

1327 except FileNotFoundError: 

1328 return 

1329 

1330 def get_named_file(self, path, basedir=None): 

1331 """Get a file from the control dir with a specific name. 

1332 

1333 Although the filename should be interpreted as a filename relative to 

1334 the control dir in a disk-based Repo, the object returned need not be 

1335 pointing to a file in that location. 

1336 

1337 Args: 

1338 path: The path to the file, relative to the control dir. 

1339 basedir: Optional argument that specifies an alternative to the 

1340 control dir. 

1341 Returns: An open file object, or None if the file does not exist. 

1342 """ 

1343 # TODO(dborowitz): sanitize filenames, since this is used directly by 

1344 # the dumb web serving code. 

1345 if basedir is None: 

1346 basedir = self.controldir() 

1347 path = path.lstrip(os.path.sep) 

1348 try: 

1349 return open(os.path.join(basedir, path), "rb") 

1350 except FileNotFoundError: 

1351 return None 

1352 

1353 def index_path(self): 

1354 """Return path to the index file.""" 

1355 return os.path.join(self.controldir(), INDEX_FILENAME) 

1356 

1357 def open_index(self) -> "Index": 

1358 """Open the index for this repository. 

1359 

1360 Raises: 

1361 NoIndexPresent: If no index is present 

1362 Returns: The matching `Index` 

1363 """ 

1364 from .index import Index 

1365 

1366 if not self.has_index(): 

1367 raise NoIndexPresent 

1368 return Index(self.index_path()) 

1369 

1370 def has_index(self): 

1371 """Check if an index is present.""" 

1372 # Bare repos must never have index files; non-bare repos may have a 

1373 # missing index file, which is treated as empty. 

1374 return not self.bare 

1375 

1376 def stage( 

1377 self, 

1378 fs_paths: Union[ 

1379 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]] 

1380 ], 

1381 ) -> None: 

1382 """Stage a set of paths. 

1383 

1384 Args: 

1385 fs_paths: List of paths, relative to the repository path 

1386 """ 

1387 root_path_bytes = os.fsencode(self.path) 

1388 

1389 if isinstance(fs_paths, (str, bytes, os.PathLike)): 

1390 fs_paths = [fs_paths] 

1391 fs_paths = list(fs_paths) 

1392 

1393 from .index import ( 

1394 _fs_to_tree_path, 

1395 blob_from_path_and_stat, 

1396 index_entry_from_directory, 

1397 index_entry_from_stat, 

1398 ) 

1399 

1400 index = self.open_index() 

1401 blob_normalizer = self.get_blob_normalizer() 

1402 for fs_path in fs_paths: 

1403 if not isinstance(fs_path, bytes): 

1404 fs_path = os.fsencode(fs_path) 

1405 if os.path.isabs(fs_path): 

1406 raise ValueError( 

1407 f"path {fs_path!r} should be relative to " 

1408 "repository root, not absolute" 

1409 ) 

1410 tree_path = _fs_to_tree_path(fs_path) 

1411 full_path = os.path.join(root_path_bytes, fs_path) 

1412 try: 

1413 st = os.lstat(full_path) 

1414 except OSError: 

1415 # File no longer exists 

1416 try: 

1417 del index[tree_path] 

1418 except KeyError: 

1419 pass # already removed 

1420 else: 

1421 if stat.S_ISDIR(st.st_mode): 

1422 entry = index_entry_from_directory(st, full_path) 

1423 if entry: 

1424 index[tree_path] = entry 

1425 else: 

1426 try: 

1427 del index[tree_path] 

1428 except KeyError: 

1429 pass 

1430 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

1431 try: 

1432 del index[tree_path] 

1433 except KeyError: 

1434 pass 

1435 else: 

1436 blob = blob_from_path_and_stat(full_path, st) 

1437 blob = blob_normalizer.checkin_normalize(blob, fs_path) 

1438 self.object_store.add_object(blob) 

1439 index[tree_path] = index_entry_from_stat(st, blob.id) 

1440 index.write() 

1441 

1442 def unstage(self, fs_paths: List[str]): 

1443 """Unstage specific file in the index 

1444 Args: 

1445 fs_paths: a list of files to unstage, 

1446 relative to the repository path. 

1447 """ 

1448 from .index import IndexEntry, _fs_to_tree_path 

1449 

1450 index = self.open_index() 

1451 try: 

1452 tree_id = self[b"HEAD"].tree 

1453 except KeyError: 

1454 # no head mean no commit in the repo 

1455 for fs_path in fs_paths: 

1456 tree_path = _fs_to_tree_path(fs_path) 

1457 del index[tree_path] 

1458 index.write() 

1459 return 

1460 

1461 for fs_path in fs_paths: 

1462 tree_path = _fs_to_tree_path(fs_path) 

1463 try: 

1464 tree = self.object_store[tree_id] 

1465 assert isinstance(tree, Tree) 

1466 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path) 

1467 except KeyError: 

1468 # if tree_entry didn't exist, this file was being added, so 

1469 # remove index entry 

1470 try: 

1471 del index[tree_path] 

1472 continue 

1473 except KeyError as exc: 

1474 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc 

1475 

1476 st = None 

1477 try: 

1478 st = os.lstat(os.path.join(self.path, fs_path)) 

1479 except FileNotFoundError: 

1480 pass 

1481 

1482 index_entry = IndexEntry( 

1483 ctime=(self[b"HEAD"].commit_time, 0), 

1484 mtime=(self[b"HEAD"].commit_time, 0), 

1485 dev=st.st_dev if st else 0, 

1486 ino=st.st_ino if st else 0, 

1487 mode=tree_entry[0], 

1488 uid=st.st_uid if st else 0, 

1489 gid=st.st_gid if st else 0, 

1490 size=len(self[tree_entry[1]].data), 

1491 sha=tree_entry[1], 

1492 ) 

1493 

1494 index[tree_path] = index_entry 

1495 index.write() 

1496 

1497 def clone( 

1498 self, 

1499 target_path, 

1500 *, 

1501 mkdir=True, 

1502 bare=False, 

1503 origin=b"origin", 

1504 checkout=None, 

1505 branch=None, 

1506 progress=None, 

1507 depth=None, 

1508 symlinks=None, 

1509 ) -> "Repo": 

1510 """Clone this repository. 

1511 

1512 Args: 

1513 target_path: Target path 

1514 mkdir: Create the target directory 

1515 bare: Whether to create a bare repository 

1516 checkout: Whether or not to check-out HEAD after cloning 

1517 origin: Base name for refs in target repository 

1518 cloned from this repository 

1519 branch: Optional branch or tag to be used as HEAD in the new repository 

1520 instead of this repository's HEAD. 

1521 progress: Optional progress function 

1522 depth: Depth at which to fetch 

1523 symlinks: Symlinks setting (default to autodetect) 

1524 Returns: Created repository as `Repo` 

1525 """ 

1526 encoded_path = os.fsencode(self.path) 

1527 

1528 if mkdir: 

1529 os.mkdir(target_path) 

1530 

1531 try: 

1532 if not bare: 

1533 target = Repo.init(target_path, symlinks=symlinks) 

1534 if checkout is None: 

1535 checkout = True 

1536 else: 

1537 if checkout: 

1538 raise ValueError("checkout and bare are incompatible") 

1539 target = Repo.init_bare(target_path) 

1540 

1541 try: 

1542 target_config = target.get_config() 

1543 target_config.set((b"remote", origin), b"url", encoded_path) 

1544 target_config.set( 

1545 (b"remote", origin), 

1546 b"fetch", 

1547 b"+refs/heads/*:refs/remotes/" + origin + b"/*", 

1548 ) 

1549 target_config.write_to_path() 

1550 

1551 ref_message = b"clone: from " + encoded_path 

1552 self.fetch(target, depth=depth) 

1553 target.refs.import_refs( 

1554 b"refs/remotes/" + origin, 

1555 self.refs.as_dict(b"refs/heads"), 

1556 message=ref_message, 

1557 ) 

1558 target.refs.import_refs( 

1559 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message 

1560 ) 

1561 

1562 head_chain, origin_sha = self.refs.follow(b"HEAD") 

1563 origin_head = head_chain[-1] if head_chain else None 

1564 if origin_sha and not origin_head: 

1565 # set detached HEAD 

1566 target.refs[b"HEAD"] = origin_sha 

1567 else: 

1568 _set_origin_head(target.refs, origin, origin_head) 

1569 head_ref = _set_default_branch( 

1570 target.refs, origin, origin_head, branch, ref_message 

1571 ) 

1572 

1573 # Update target head 

1574 if head_ref: 

1575 head = _set_head(target.refs, head_ref, ref_message) 

1576 else: 

1577 head = None 

1578 

1579 if checkout and head is not None: 

1580 target.reset_index() 

1581 except BaseException: 

1582 target.close() 

1583 raise 

1584 except BaseException: 

1585 if mkdir: 

1586 import shutil 

1587 

1588 shutil.rmtree(target_path) 

1589 raise 

1590 return target 

1591 

1592 def reset_index(self, tree: Optional[bytes] = None): 

1593 """Reset the index back to a specific tree. 

1594 

1595 Args: 

1596 tree: Tree SHA to reset to, None for current HEAD tree. 

1597 """ 

1598 from .index import ( 

1599 build_index_from_tree, 

1600 symlink, 

1601 validate_path_element_default, 

1602 validate_path_element_ntfs, 

1603 ) 

1604 

1605 if tree is None: 

1606 head = self[b"HEAD"] 

1607 if isinstance(head, Tag): 

1608 _cls, obj = head.object 

1609 head = self.get_object(obj) 

1610 tree = head.tree 

1611 config = self.get_config() 

1612 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt") 

1613 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"): 

1614 validate_path_element = validate_path_element_ntfs 

1615 else: 

1616 validate_path_element = validate_path_element_default 

1617 if config.get_boolean(b"core", b"symlinks", True): 

1618 symlink_fn = symlink 

1619 else: 

1620 

1621 def symlink_fn(source, target): # type: ignore 

1622 with open( 

1623 target, "w" + ("b" if isinstance(source, bytes) else "") 

1624 ) as f: 

1625 f.write(source) 

1626 

1627 return build_index_from_tree( 

1628 self.path, 

1629 self.index_path(), 

1630 self.object_store, 

1631 tree, 

1632 honor_filemode=honor_filemode, 

1633 validate_path_element=validate_path_element, 

1634 symlink_fn=symlink_fn, 

1635 ) 

1636 

1637 def get_worktree_config(self) -> "ConfigFile": 

1638 from .config import ConfigFile 

1639 

1640 path = os.path.join(self.commondir(), "config.worktree") 

1641 try: 

1642 return ConfigFile.from_path(path) 

1643 except FileNotFoundError: 

1644 cf = ConfigFile() 

1645 cf.path = path 

1646 return cf 

1647 

1648 def get_config(self) -> "ConfigFile": 

1649 """Retrieve the config object. 

1650 

1651 Returns: `ConfigFile` object for the ``.git/config`` file. 

1652 """ 

1653 from .config import ConfigFile 

1654 

1655 path = os.path.join(self._commondir, "config") 

1656 try: 

1657 return ConfigFile.from_path(path) 

1658 except FileNotFoundError: 

1659 ret = ConfigFile() 

1660 ret.path = path 

1661 return ret 

1662 

1663 def get_description(self): 

1664 """Retrieve the description of this repository. 

1665 

1666 Returns: A string describing the repository or None. 

1667 """ 

1668 path = os.path.join(self._controldir, "description") 

1669 try: 

1670 with GitFile(path, "rb") as f: 

1671 return f.read() 

1672 except FileNotFoundError: 

1673 return None 

1674 

1675 def __repr__(self) -> str: 

1676 return f"<Repo at {self.path!r}>" 

1677 

1678 def set_description(self, description): 

1679 """Set the description for this repository. 

1680 

1681 Args: 

1682 description: Text to set as description for this repository. 

1683 """ 

1684 self._put_named_file("description", description) 

1685 

1686 @classmethod 

1687 def _init_maybe_bare( 

1688 cls, 

1689 path, 

1690 controldir, 

1691 bare, 

1692 object_store=None, 

1693 config=None, 

1694 default_branch=None, 

1695 symlinks: Optional[bool] = None, 

1696 ): 

1697 for d in BASE_DIRECTORIES: 

1698 os.mkdir(os.path.join(controldir, *d)) 

1699 if object_store is None: 

1700 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR)) 

1701 ret = cls(path, bare=bare, object_store=object_store) 

1702 if default_branch is None: 

1703 if config is None: 

1704 from .config import StackedConfig 

1705 

1706 config = StackedConfig.default() 

1707 try: 

1708 default_branch = config.get("init", "defaultBranch") 

1709 except KeyError: 

1710 default_branch = DEFAULT_BRANCH 

1711 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch) 

1712 ret._init_files(bare=bare, symlinks=symlinks) 

1713 return ret 

1714 

1715 @classmethod 

1716 def init( 

1717 cls, 

1718 path: str, 

1719 *, 

1720 mkdir: bool = False, 

1721 config=None, 

1722 default_branch=None, 

1723 symlinks: Optional[bool] = None, 

1724 ) -> "Repo": 

1725 """Create a new repository. 

1726 

1727 Args: 

1728 path: Path in which to create the repository 

1729 mkdir: Whether to create the directory 

1730 Returns: `Repo` instance 

1731 """ 

1732 if mkdir: 

1733 os.mkdir(path) 

1734 controldir = os.path.join(path, CONTROLDIR) 

1735 os.mkdir(controldir) 

1736 _set_filesystem_hidden(controldir) 

1737 return cls._init_maybe_bare( 

1738 path, 

1739 controldir, 

1740 False, 

1741 config=config, 

1742 default_branch=default_branch, 

1743 symlinks=symlinks, 

1744 ) 

1745 

1746 @classmethod 

1747 def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False): 

1748 """Create a new working directory linked to a repository. 

1749 

1750 Args: 

1751 path: Path in which to create the working tree. 

1752 main_repo: Main repository to reference 

1753 identifier: Worktree identifier 

1754 mkdir: Whether to create the directory 

1755 Returns: `Repo` instance 

1756 """ 

1757 if mkdir: 

1758 os.mkdir(path) 

1759 if identifier is None: 

1760 identifier = os.path.basename(path) 

1761 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) 

1762 worktree_controldir = os.path.join(main_worktreesdir, identifier) 

1763 gitdirfile = os.path.join(path, CONTROLDIR) 

1764 with open(gitdirfile, "wb") as f: 

1765 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n") 

1766 try: 

1767 os.mkdir(main_worktreesdir) 

1768 except FileExistsError: 

1769 pass 

1770 try: 

1771 os.mkdir(worktree_controldir) 

1772 except FileExistsError: 

1773 pass 

1774 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f: 

1775 f.write(os.fsencode(gitdirfile) + b"\n") 

1776 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f: 

1777 f.write(b"../..\n") 

1778 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f: 

1779 f.write(main_repo.head() + b"\n") 

1780 r = cls(path) 

1781 r.reset_index() 

1782 return r 

1783 

1784 @classmethod 

1785 def init_bare( 

1786 cls, path, *, mkdir=False, object_store=None, config=None, default_branch=None 

1787 ): 

1788 """Create a new bare repository. 

1789 

1790 ``path`` should already exist and be an empty directory. 

1791 

1792 Args: 

1793 path: Path to create bare repository in 

1794 Returns: a `Repo` instance 

1795 """ 

1796 if mkdir: 

1797 os.mkdir(path) 

1798 return cls._init_maybe_bare( 

1799 path, 

1800 path, 

1801 True, 

1802 object_store=object_store, 

1803 config=config, 

1804 default_branch=default_branch, 

1805 ) 

1806 

1807 create = init_bare 

1808 

1809 def close(self): 

1810 """Close any files opened by this repository.""" 

1811 self.object_store.close() 

1812 

1813 def __enter__(self): 

1814 return self 

1815 

1816 def __exit__(self, exc_type, exc_val, exc_tb): 

1817 self.close() 

1818 

1819 def get_blob_normalizer(self): 

1820 """Return a BlobNormalizer object.""" 

1821 # TODO Parse the git attributes files 

1822 git_attributes = {} 

1823 config_stack = self.get_config_stack() 

1824 try: 

1825 tree = self.object_store[self.refs[b"HEAD"]].tree 

1826 return TreeBlobNormalizer( 

1827 config_stack, 

1828 git_attributes, 

1829 self.object_store, 

1830 tree, 

1831 ) 

1832 except KeyError: 

1833 return BlobNormalizer(config_stack, git_attributes) 

1834 

1835 

1836class MemoryRepo(BaseRepo): 

1837 """Repo that stores refs, objects, and named files in memory. 

1838 

1839 MemoryRepos are always bare: they have no working tree and no index, since 

1840 those have a stronger dependency on the filesystem. 

1841 """ 

1842 

1843 def __init__(self) -> None: 

1844 from .config import ConfigFile 

1845 

1846 self._reflog: List[Any] = [] 

1847 refs_container = DictRefsContainer({}, logger=self._append_reflog) 

1848 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore 

1849 self._named_files: Dict[str, bytes] = {} 

1850 self.bare = True 

1851 self._config = ConfigFile() 

1852 self._description = None 

1853 

1854 def _append_reflog(self, *args): 

1855 self._reflog.append(args) 

1856 

1857 def set_description(self, description): 

1858 self._description = description 

1859 

1860 def get_description(self): 

1861 return self._description 

1862 

1863 def _determine_file_mode(self): 

1864 """Probe the file-system to determine whether permissions can be trusted. 

1865 

1866 Returns: True if permissions can be trusted, False otherwise. 

1867 """ 

1868 return sys.platform != "win32" 

1869 

1870 def _determine_symlinks(self): 

1871 """Probe the file-system to determine whether permissions can be trusted. 

1872 

1873 Returns: True if permissions can be trusted, False otherwise. 

1874 """ 

1875 return sys.platform != "win32" 

1876 

1877 def _put_named_file(self, path, contents): 

1878 """Write a file to the control dir with the given name and contents. 

1879 

1880 Args: 

1881 path: The path to the file, relative to the control dir. 

1882 contents: A string to write to the file. 

1883 """ 

1884 self._named_files[path] = contents 

1885 

1886 def _del_named_file(self, path): 

1887 try: 

1888 del self._named_files[path] 

1889 except KeyError: 

1890 pass 

1891 

1892 def get_named_file(self, path, basedir=None): 

1893 """Get a file from the control dir with a specific name. 

1894 

1895 Although the filename should be interpreted as a filename relative to 

1896 the control dir in a disk-baked Repo, the object returned need not be 

1897 pointing to a file in that location. 

1898 

1899 Args: 

1900 path: The path to the file, relative to the control dir. 

1901 Returns: An open file object, or None if the file does not exist. 

1902 """ 

1903 contents = self._named_files.get(path, None) 

1904 if contents is None: 

1905 return None 

1906 return BytesIO(contents) 

1907 

1908 def open_index(self): 

1909 """Fail to open index for this repo, since it is bare. 

1910 

1911 Raises: 

1912 NoIndexPresent: Raised when no index is present 

1913 """ 

1914 raise NoIndexPresent 

1915 

1916 def get_config(self): 

1917 """Retrieve the config object. 

1918 

1919 Returns: `ConfigFile` object. 

1920 """ 

1921 return self._config 

1922 

1923 @classmethod 

1924 def init_bare(cls, objects, refs): 

1925 """Create a new bare repository in memory. 

1926 

1927 Args: 

1928 objects: Objects for the new repository, 

1929 as iterable 

1930 refs: Refs as dictionary, mapping names 

1931 to object SHA1s 

1932 """ 

1933 ret = cls() 

1934 for obj in objects: 

1935 ret.object_store.add_object(obj) 

1936 for refname, sha in refs.items(): 

1937 ret.refs.add_if_new(refname, sha) 

1938 ret._init_files(bare=True) 

1939 return ret