Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/index.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

499 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

5# General Public License as public by the Free Software Foundation; version 2.0 

6# or (at your option) any later version. You can redistribute it and/or 

7# modify it under the terms of either of these two licenses. 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# You should have received a copy of the licenses; if not, see 

16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

18# License, Version 2.0. 

19# 

20 

21"""Parser for the git index file format.""" 

22 

23import os 

24import stat 

25import struct 

26import sys 

27from dataclasses import dataclass 

28from enum import Enum 

29from typing import ( 

30 Any, 

31 BinaryIO, 

32 Callable, 

33 Dict, 

34 Iterable, 

35 Iterator, 

36 List, 

37 Optional, 

38 Tuple, 

39 Union, 

40) 

41 

42from .file import GitFile 

43from .object_store import iter_tree_contents 

44from .objects import ( 

45 S_IFGITLINK, 

46 S_ISGITLINK, 

47 Blob, 

48 ObjectID, 

49 Tree, 

50 hex_to_sha, 

51 sha_to_hex, 

52) 

53from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

54 

55# 2-bit stage (during merge) 

56FLAG_STAGEMASK = 0x3000 

57FLAG_STAGESHIFT = 12 

58FLAG_NAMEMASK = 0x0FFF 

59 

60# assume-valid 

61FLAG_VALID = 0x8000 

62 

63# extended flag (must be zero in version 2) 

64FLAG_EXTENDED = 0x4000 

65 

66# used by sparse checkout 

67EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

68 

69# used by "git add -N" 

70EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

71 

72DEFAULT_VERSION = 2 

73 

74 

75class Stage(Enum): 

76 NORMAL = 0 

77 MERGE_CONFLICT_ANCESTOR = 1 

78 MERGE_CONFLICT_THIS = 2 

79 MERGE_CONFLICT_OTHER = 3 

80 

81 

82@dataclass 

83class SerializedIndexEntry: 

84 name: bytes 

85 ctime: Union[int, float, Tuple[int, int]] 

86 mtime: Union[int, float, Tuple[int, int]] 

87 dev: int 

88 ino: int 

89 mode: int 

90 uid: int 

91 gid: int 

92 size: int 

93 sha: bytes 

94 flags: int 

95 extended_flags: int 

96 

97 def stage(self) -> Stage: 

98 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

99 

100 

101@dataclass 

102class IndexEntry: 

103 ctime: Union[int, float, Tuple[int, int]] 

104 mtime: Union[int, float, Tuple[int, int]] 

105 dev: int 

106 ino: int 

107 mode: int 

108 uid: int 

109 gid: int 

110 size: int 

111 sha: bytes 

112 

113 @classmethod 

114 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

115 return cls( 

116 ctime=serialized.ctime, 

117 mtime=serialized.mtime, 

118 dev=serialized.dev, 

119 ino=serialized.ino, 

120 mode=serialized.mode, 

121 uid=serialized.uid, 

122 gid=serialized.gid, 

123 size=serialized.size, 

124 sha=serialized.sha, 

125 ) 

126 

127 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

128 return SerializedIndexEntry( 

129 name=name, 

130 ctime=self.ctime, 

131 mtime=self.mtime, 

132 dev=self.dev, 

133 ino=self.ino, 

134 mode=self.mode, 

135 uid=self.uid, 

136 gid=self.gid, 

137 size=self.size, 

138 sha=self.sha, 

139 flags=stage.value << FLAG_STAGESHIFT, 

140 extended_flags=0, 

141 ) 

142 

143 

144class ConflictedIndexEntry: 

145 """Index entry that represents a conflict.""" 

146 

147 ancestor: Optional[IndexEntry] 

148 this: Optional[IndexEntry] 

149 other: Optional[IndexEntry] 

150 

151 def __init__( 

152 self, 

153 ancestor: Optional[IndexEntry] = None, 

154 this: Optional[IndexEntry] = None, 

155 other: Optional[IndexEntry] = None, 

156 ) -> None: 

157 self.ancestor = ancestor 

158 self.this = this 

159 self.other = other 

160 

161 

162class UnmergedEntries(Exception): 

163 """Unmerged entries exist in the index.""" 

164 

165 

166def pathsplit(path: bytes) -> Tuple[bytes, bytes]: 

167 """Split a /-delimited path into a directory part and a basename. 

168 

169 Args: 

170 path: The path to split. 

171 

172 Returns: 

173 Tuple with directory name and basename 

174 """ 

175 try: 

176 (dirname, basename) = path.rsplit(b"/", 1) 

177 except ValueError: 

178 return (b"", path) 

179 else: 

180 return (dirname, basename) 

181 

182 

183def pathjoin(*args): 

184 """Join a /-delimited path.""" 

185 return b"/".join([p for p in args if p]) 

186 

187 

188def read_cache_time(f): 

189 """Read a cache time. 

190 

191 Args: 

192 f: File-like object to read from 

193 Returns: 

194 Tuple with seconds and nanoseconds 

195 """ 

196 return struct.unpack(">LL", f.read(8)) 

197 

198 

199def write_cache_time(f, t): 

200 """Write a cache time. 

201 

202 Args: 

203 f: File-like object to write to 

204 t: Time to write (as int, float or tuple with secs and nsecs) 

205 """ 

206 if isinstance(t, int): 

207 t = (t, 0) 

208 elif isinstance(t, float): 

209 (secs, nsecs) = divmod(t, 1.0) 

210 t = (int(secs), int(nsecs * 1000000000)) 

211 elif not isinstance(t, tuple): 

212 raise TypeError(t) 

213 f.write(struct.pack(">LL", *t)) 

214 

215 

216def read_cache_entry(f, version: int) -> SerializedIndexEntry: 

217 """Read an entry from a cache file. 

218 

219 Args: 

220 f: File-like object to read from 

221 """ 

222 beginoffset = f.tell() 

223 ctime = read_cache_time(f) 

224 mtime = read_cache_time(f) 

225 ( 

226 dev, 

227 ino, 

228 mode, 

229 uid, 

230 gid, 

231 size, 

232 sha, 

233 flags, 

234 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

235 if flags & FLAG_EXTENDED: 

236 if version < 3: 

237 raise AssertionError("extended flag set in index with version < 3") 

238 (extended_flags,) = struct.unpack(">H", f.read(2)) 

239 else: 

240 extended_flags = 0 

241 name = f.read(flags & FLAG_NAMEMASK) 

242 # Padding: 

243 if version < 4: 

244 real_size = (f.tell() - beginoffset + 8) & ~7 

245 f.read((beginoffset + real_size) - f.tell()) 

246 return SerializedIndexEntry( 

247 name, 

248 ctime, 

249 mtime, 

250 dev, 

251 ino, 

252 mode, 

253 uid, 

254 gid, 

255 size, 

256 sha_to_hex(sha), 

257 flags & ~FLAG_NAMEMASK, 

258 extended_flags, 

259 ) 

260 

261 

262def write_cache_entry(f, entry: SerializedIndexEntry, version: int) -> None: 

263 """Write an index entry to a file. 

264 

265 Args: 

266 f: File object 

267 entry: IndexEntry to write, tuple with: 

268 """ 

269 beginoffset = f.tell() 

270 write_cache_time(f, entry.ctime) 

271 write_cache_time(f, entry.mtime) 

272 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

273 if entry.extended_flags: 

274 flags |= FLAG_EXTENDED 

275 if flags & FLAG_EXTENDED and version is not None and version < 3: 

276 raise AssertionError("unable to use extended flags in version < 3") 

277 f.write( 

278 struct.pack( 

279 b">LLLLLL20sH", 

280 entry.dev & 0xFFFFFFFF, 

281 entry.ino & 0xFFFFFFFF, 

282 entry.mode, 

283 entry.uid, 

284 entry.gid, 

285 entry.size, 

286 hex_to_sha(entry.sha), 

287 flags, 

288 ) 

289 ) 

290 if flags & FLAG_EXTENDED: 

291 f.write(struct.pack(b">H", entry.extended_flags)) 

292 f.write(entry.name) 

293 if version < 4: 

294 real_size = (f.tell() - beginoffset + 8) & ~7 

295 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

296 

297 

298class UnsupportedIndexFormat(Exception): 

299 """An unsupported index format was encountered.""" 

300 

301 def __init__(self, version) -> None: 

302 self.index_format_version = version 

303 

304 

305def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

306 """Read an index file, yielding the individual entries.""" 

307 header = f.read(4) 

308 if header != b"DIRC": 

309 raise AssertionError(f"Invalid index file header: {header!r}") 

310 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

311 if version not in (1, 2, 3): 

312 raise UnsupportedIndexFormat(version) 

313 for i in range(num_entries): 

314 yield read_cache_entry(f, version) 

315 

316 

317def read_index_dict(f) -> Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

318 """Read an index file and return it as a dictionary. 

319 Dict Key is tuple of path and stage number, as 

320 path alone is not unique 

321 Args: 

322 f: File object to read fromls. 

323 """ 

324 ret: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

325 for entry in read_index(f): 

326 stage = entry.stage() 

327 if stage == Stage.NORMAL: 

328 ret[entry.name] = IndexEntry.from_serialized(entry) 

329 else: 

330 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

331 if isinstance(existing, IndexEntry): 

332 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

333 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

334 existing.ancestor = IndexEntry.from_serialized(entry) 

335 elif stage == Stage.MERGE_CONFLICT_THIS: 

336 existing.this = IndexEntry.from_serialized(entry) 

337 elif stage == Stage.MERGE_CONFLICT_OTHER: 

338 existing.other = IndexEntry.from_serialized(entry) 

339 return ret 

340 

341 

342def write_index( 

343 f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None 

344): 

345 """Write an index file. 

346 

347 Args: 

348 f: File-like object to write to 

349 version: Version number to write 

350 entries: Iterable over the entries to write 

351 """ 

352 if version is None: 

353 version = DEFAULT_VERSION 

354 f.write(b"DIRC") 

355 f.write(struct.pack(b">LL", version, len(entries))) 

356 for entry in entries: 

357 write_cache_entry(f, entry, version) 

358 

359 

360def write_index_dict( 

361 f: BinaryIO, 

362 entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

363 version: Optional[int] = None, 

364) -> None: 

365 """Write an index file based on the contents of a dictionary. 

366 being careful to sort by path and then by stage. 

367 """ 

368 entries_list = [] 

369 for key in sorted(entries): 

370 value = entries[key] 

371 if isinstance(value, ConflictedIndexEntry): 

372 if value.ancestor is not None: 

373 entries_list.append( 

374 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

375 ) 

376 if value.this is not None: 

377 entries_list.append( 

378 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

379 ) 

380 if value.other is not None: 

381 entries_list.append( 

382 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

383 ) 

384 else: 

385 entries_list.append(value.serialize(key, Stage.NORMAL)) 

386 write_index(f, entries_list, version=version) 

387 

388 

389def cleanup_mode(mode: int) -> int: 

390 """Cleanup a mode value. 

391 

392 This will return a mode that can be stored in a tree object. 

393 

394 Args: 

395 mode: Mode to clean up. 

396 

397 Returns: 

398 mode 

399 """ 

400 if stat.S_ISLNK(mode): 

401 return stat.S_IFLNK 

402 elif stat.S_ISDIR(mode): 

403 return stat.S_IFDIR 

404 elif S_ISGITLINK(mode): 

405 return S_IFGITLINK 

406 ret = stat.S_IFREG | 0o644 

407 if mode & 0o100: 

408 ret |= 0o111 

409 return ret 

410 

411 

412class Index: 

413 """A Git Index file.""" 

414 

415 _byname: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

416 

417 def __init__(self, filename: Union[bytes, str], read=True) -> None: 

418 """Create an index object associated with the given filename. 

419 

420 Args: 

421 filename: Path to the index file 

422 read: Whether to initialize the index from the given file, should it exist. 

423 """ 

424 self._filename = filename 

425 # TODO(jelmer): Store the version returned by read_index 

426 self._version = None 

427 self.clear() 

428 if read: 

429 self.read() 

430 

431 @property 

432 def path(self): 

433 return self._filename 

434 

435 def __repr__(self) -> str: 

436 return f"{self.__class__.__name__}({self._filename!r})" 

437 

438 def write(self) -> None: 

439 """Write current contents of index to disk.""" 

440 f = GitFile(self._filename, "wb") 

441 try: 

442 f = SHA1Writer(f) 

443 write_index_dict(f, self._byname, version=self._version) 

444 finally: 

445 f.close() 

446 

447 def read(self): 

448 """Read current contents of index from disk.""" 

449 if not os.path.exists(self._filename): 

450 return 

451 f = GitFile(self._filename, "rb") 

452 try: 

453 f = SHA1Reader(f) 

454 self.update(read_index_dict(f)) 

455 # FIXME: Additional data? 

456 f.read(os.path.getsize(self._filename) - f.tell() - 20) 

457 f.check_sha() 

458 finally: 

459 f.close() 

460 

461 def __len__(self) -> int: 

462 """Number of entries in this index file.""" 

463 return len(self._byname) 

464 

465 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

466 """Retrieve entry by relative path and stage. 

467 

468 Returns: Either a IndexEntry or a ConflictedIndexEntry 

469 Raises KeyError: if the entry does not exist 

470 """ 

471 return self._byname[key] 

472 

473 def __iter__(self) -> Iterator[bytes]: 

474 """Iterate over the paths and stages in this index.""" 

475 return iter(self._byname) 

476 

477 def __contains__(self, key): 

478 return key in self._byname 

479 

480 def get_sha1(self, path: bytes) -> bytes: 

481 """Return the (git object) SHA1 for the object at a path.""" 

482 value = self[path] 

483 if isinstance(value, ConflictedIndexEntry): 

484 raise UnmergedEntries 

485 return value.sha 

486 

487 def get_mode(self, path: bytes) -> int: 

488 """Return the POSIX file mode for the object at a path.""" 

489 value = self[path] 

490 if isinstance(value, ConflictedIndexEntry): 

491 raise UnmergedEntries 

492 return value.mode 

493 

494 def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]: 

495 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

496 for path in self: 

497 entry = self[path] 

498 if isinstance(entry, ConflictedIndexEntry): 

499 raise UnmergedEntries 

500 yield path, entry.sha, cleanup_mode(entry.mode) 

501 

502 def has_conflicts(self) -> bool: 

503 for value in self._byname.values(): 

504 if isinstance(value, ConflictedIndexEntry): 

505 return True 

506 return False 

507 

508 def clear(self): 

509 """Remove all contents from this index.""" 

510 self._byname = {} 

511 

512 def __setitem__( 

513 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

514 ) -> None: 

515 assert isinstance(name, bytes) 

516 self._byname[name] = value 

517 

518 def __delitem__(self, name: bytes) -> None: 

519 del self._byname[name] 

520 

521 def iteritems( 

522 self, 

523 ) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

524 return iter(self._byname.items()) 

525 

526 def items(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

527 return iter(self._byname.items()) 

528 

529 def update(self, entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]): 

530 for key, value in entries.items(): 

531 self[key] = value 

532 

533 def paths(self): 

534 yield from self._byname.keys() 

535 

536 def changes_from_tree( 

537 self, object_store, tree: ObjectID, want_unchanged: bool = False 

538 ): 

539 """Find the differences between the contents of this index and a tree. 

540 

541 Args: 

542 object_store: Object store to use for retrieving tree contents 

543 tree: SHA1 of the root tree 

544 want_unchanged: Whether unchanged files should be reported 

545 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

546 newmode), (oldsha, newsha) 

547 """ 

548 

549 def lookup_entry(path): 

550 entry = self[path] 

551 return entry.sha, cleanup_mode(entry.mode) 

552 

553 yield from changes_from_tree( 

554 self.paths(), 

555 lookup_entry, 

556 object_store, 

557 tree, 

558 want_unchanged=want_unchanged, 

559 ) 

560 

561 def commit(self, object_store): 

562 """Create a new tree from an index. 

563 

564 Args: 

565 object_store: Object store to save the tree in 

566 Returns: 

567 Root tree SHA 

568 """ 

569 return commit_tree(object_store, self.iterobjects()) 

570 

571 

572def commit_tree( 

573 object_store: ObjectContainer, blobs: Iterable[Tuple[bytes, bytes, int]] 

574) -> bytes: 

575 """Commit a new tree. 

576 

577 Args: 

578 object_store: Object store to add trees to 

579 blobs: Iterable over blob path, sha, mode entries 

580 Returns: 

581 SHA1 of the created tree. 

582 """ 

583 trees: Dict[bytes, Any] = {b"": {}} 

584 

585 def add_tree(path): 

586 if path in trees: 

587 return trees[path] 

588 dirname, basename = pathsplit(path) 

589 t = add_tree(dirname) 

590 assert isinstance(basename, bytes) 

591 newtree = {} 

592 t[basename] = newtree 

593 trees[path] = newtree 

594 return newtree 

595 

596 for path, sha, mode in blobs: 

597 tree_path, basename = pathsplit(path) 

598 tree = add_tree(tree_path) 

599 tree[basename] = (mode, sha) 

600 

601 def build_tree(path): 

602 tree = Tree() 

603 for basename, entry in trees[path].items(): 

604 if isinstance(entry, dict): 

605 mode = stat.S_IFDIR 

606 sha = build_tree(pathjoin(path, basename)) 

607 else: 

608 (mode, sha) = entry 

609 tree.add(basename, mode, sha) 

610 object_store.add_object(tree) 

611 return tree.id 

612 

613 return build_tree(b"") 

614 

615 

616def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

617 """Create a new tree from an index. 

618 

619 Args: 

620 object_store: Object store to save the tree in 

621 index: Index file 

622 Note: This function is deprecated, use index.commit() instead. 

623 Returns: Root tree sha. 

624 """ 

625 return commit_tree(object_store, index.iterobjects()) 

626 

627 

628def changes_from_tree( 

629 names: Iterable[bytes], 

630 lookup_entry: Callable[[bytes], Tuple[bytes, int]], 

631 object_store: ObjectContainer, 

632 tree: Optional[bytes], 

633 want_unchanged=False, 

634) -> Iterable[ 

635 Tuple[ 

636 Tuple[Optional[bytes], Optional[bytes]], 

637 Tuple[Optional[int], Optional[int]], 

638 Tuple[Optional[bytes], Optional[bytes]], 

639 ] 

640]: 

641 """Find the differences between the contents of a tree and 

642 a working copy. 

643 

644 Args: 

645 names: Iterable of names in the working copy 

646 lookup_entry: Function to lookup an entry in the working copy 

647 object_store: Object store to use for retrieving tree contents 

648 tree: SHA1 of the root tree, or None for an empty tree 

649 want_unchanged: Whether unchanged files should be reported 

650 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

651 (oldsha, newsha) 

652 """ 

653 # TODO(jelmer): Support a include_trees option 

654 other_names = set(names) 

655 

656 if tree is not None: 

657 for name, mode, sha in iter_tree_contents(object_store, tree): 

658 try: 

659 (other_sha, other_mode) = lookup_entry(name) 

660 except KeyError: 

661 # Was removed 

662 yield ((name, None), (mode, None), (sha, None)) 

663 else: 

664 other_names.remove(name) 

665 if want_unchanged or other_sha != sha or other_mode != mode: 

666 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

667 

668 # Mention added files 

669 for name in other_names: 

670 try: 

671 (other_sha, other_mode) = lookup_entry(name) 

672 except KeyError: 

673 pass 

674 else: 

675 yield ((None, name), (None, other_mode), (None, other_sha)) 

676 

677 

678def index_entry_from_stat( 

679 stat_val, 

680 hex_sha: bytes, 

681 mode: Optional[int] = None, 

682): 

683 """Create a new index entry from a stat value. 

684 

685 Args: 

686 stat_val: POSIX stat_result instance 

687 hex_sha: Hex sha of the object 

688 """ 

689 if mode is None: 

690 mode = cleanup_mode(stat_val.st_mode) 

691 

692 return IndexEntry( 

693 stat_val.st_ctime, 

694 stat_val.st_mtime, 

695 stat_val.st_dev, 

696 stat_val.st_ino, 

697 mode, 

698 stat_val.st_uid, 

699 stat_val.st_gid, 

700 stat_val.st_size, 

701 hex_sha, 

702 ) 

703 

704 

705if sys.platform == "win32": 

706 # On Windows, creating symlinks either requires administrator privileges 

707 # or developer mode. Raise a more helpful error when we're unable to 

708 # create symlinks 

709 

710 # https://github.com/jelmer/dulwich/issues/1005 

711 

712 class WindowsSymlinkPermissionError(PermissionError): 

713 def __init__(self, errno, msg, filename) -> None: 

714 super(PermissionError, self).__init__( 

715 errno, 

716 "Unable to create symlink; " 

717 f"do you have developer mode enabled? {msg}", 

718 filename, 

719 ) 

720 

721 def symlink(src, dst, target_is_directory=False, *, dir_fd=None): 

722 try: 

723 return os.symlink( 

724 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

725 ) 

726 except PermissionError as e: 

727 raise WindowsSymlinkPermissionError(e.errno, e.strerror, e.filename) from e 

728else: 

729 symlink = os.symlink 

730 

731 

732def build_file_from_blob( 

733 blob: Blob, 

734 mode: int, 

735 target_path: bytes, 

736 *, 

737 honor_filemode=True, 

738 tree_encoding="utf-8", 

739 symlink_fn=None, 

740): 

741 """Build a file or symlink on disk based on a Git object. 

742 

743 Args: 

744 blob: The git object 

745 mode: File mode 

746 target_path: Path to write to 

747 honor_filemode: An optional flag to honor core.filemode setting in 

748 config file, default is core.filemode=True, change executable bit 

749 symlink: Function to use for creating symlinks 

750 Returns: stat object for the file 

751 """ 

752 try: 

753 oldstat = os.lstat(target_path) 

754 except FileNotFoundError: 

755 oldstat = None 

756 contents = blob.as_raw_string() 

757 if stat.S_ISLNK(mode): 

758 if oldstat: 

759 os.unlink(target_path) 

760 if sys.platform == "win32": 

761 # os.readlink on Python3 on Windows requires a unicode string. 

762 contents = contents.decode(tree_encoding) # type: ignore 

763 target_path = target_path.decode(tree_encoding) # type: ignore 

764 (symlink_fn or symlink)(contents, target_path) 

765 else: 

766 if oldstat is not None and oldstat.st_size == len(contents): 

767 with open(target_path, "rb") as f: 

768 if f.read() == contents: 

769 return oldstat 

770 

771 with open(target_path, "wb") as f: 

772 # Write out file 

773 f.write(contents) 

774 

775 if honor_filemode: 

776 os.chmod(target_path, mode) 

777 

778 return os.lstat(target_path) 

779 

780 

781INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

782 

783 

784def validate_path_element_default(element: bytes) -> bool: 

785 return element.lower() not in INVALID_DOTNAMES 

786 

787 

788def validate_path_element_ntfs(element: bytes) -> bool: 

789 stripped = element.rstrip(b". ").lower() 

790 if stripped in INVALID_DOTNAMES: 

791 return False 

792 if stripped == b"git~1": 

793 return False 

794 return True 

795 

796 

797def validate_path(path: bytes, element_validator=validate_path_element_default) -> bool: 

798 """Default path validator that just checks for .git/.""" 

799 parts = path.split(b"/") 

800 for p in parts: 

801 if not element_validator(p): 

802 return False 

803 else: 

804 return True 

805 

806 

807def build_index_from_tree( 

808 root_path: Union[str, bytes], 

809 index_path: Union[str, bytes], 

810 object_store: ObjectContainer, 

811 tree_id: bytes, 

812 honor_filemode: bool = True, 

813 validate_path_element=validate_path_element_default, 

814 symlink_fn=None, 

815): 

816 """Generate and materialize index from a tree. 

817 

818 Args: 

819 tree_id: Tree to materialize 

820 root_path: Target dir for materialized index files 

821 index_path: Target path for generated index 

822 object_store: Non-empty object store holding tree contents 

823 honor_filemode: An optional flag to honor core.filemode setting in 

824 config file, default is core.filemode=True, change executable bit 

825 validate_path_element: Function to validate path elements to check 

826 out; default just refuses .git and .. directories. 

827 

828 Note: existing index is wiped and contents are not merged 

829 in a working dir. Suitable only for fresh clones. 

830 """ 

831 index = Index(index_path, read=False) 

832 if not isinstance(root_path, bytes): 

833 root_path = os.fsencode(root_path) 

834 

835 for entry in iter_tree_contents(object_store, tree_id): 

836 if not validate_path(entry.path, validate_path_element): 

837 continue 

838 full_path = _tree_to_fs_path(root_path, entry.path) 

839 

840 if not os.path.exists(os.path.dirname(full_path)): 

841 os.makedirs(os.path.dirname(full_path)) 

842 

843 # TODO(jelmer): Merge new index into working tree 

844 if S_ISGITLINK(entry.mode): 

845 if not os.path.isdir(full_path): 

846 os.mkdir(full_path) 

847 st = os.lstat(full_path) 

848 # TODO(jelmer): record and return submodule paths 

849 else: 

850 obj = object_store[entry.sha] 

851 assert isinstance(obj, Blob) 

852 st = build_file_from_blob( 

853 obj, 

854 entry.mode, 

855 full_path, 

856 honor_filemode=honor_filemode, 

857 symlink_fn=symlink_fn, 

858 ) 

859 

860 # Add file to index 

861 if not honor_filemode or S_ISGITLINK(entry.mode): 

862 # we can not use tuple slicing to build a new tuple, 

863 # because on windows that will convert the times to 

864 # longs, which causes errors further along 

865 st_tuple = ( 

866 entry.mode, 

867 st.st_ino, 

868 st.st_dev, 

869 st.st_nlink, 

870 st.st_uid, 

871 st.st_gid, 

872 st.st_size, 

873 st.st_atime, 

874 st.st_mtime, 

875 st.st_ctime, 

876 ) 

877 st = st.__class__(st_tuple) 

878 # default to a stage 0 index entry (normal) 

879 # when reading from the filesystem 

880 index[entry.path] = index_entry_from_stat(st, entry.sha) 

881 

882 index.write() 

883 

884 

885def blob_from_path_and_mode(fs_path: bytes, mode: int, tree_encoding="utf-8"): 

886 """Create a blob from a path and a stat object. 

887 

888 Args: 

889 fs_path: Full file system path to file 

890 mode: File mode 

891 Returns: A `Blob` object 

892 """ 

893 assert isinstance(fs_path, bytes) 

894 blob = Blob() 

895 if stat.S_ISLNK(mode): 

896 if sys.platform == "win32": 

897 # os.readlink on Python3 on Windows requires a unicode string. 

898 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

899 else: 

900 blob.data = os.readlink(fs_path) 

901 else: 

902 with open(fs_path, "rb") as f: 

903 blob.data = f.read() 

904 return blob 

905 

906 

907def blob_from_path_and_stat(fs_path: bytes, st, tree_encoding="utf-8"): 

908 """Create a blob from a path and a stat object. 

909 

910 Args: 

911 fs_path: Full file system path to file 

912 st: A stat object 

913 Returns: A `Blob` object 

914 """ 

915 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

916 

917 

918def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

919 """Read the head commit of a submodule. 

920 

921 Args: 

922 path: path to the submodule 

923 Returns: HEAD sha, None if not a valid head/repository 

924 """ 

925 from .errors import NotGitRepository 

926 from .repo import Repo 

927 

928 # Repo currently expects a "str", so decode if necessary. 

929 # TODO(jelmer): Perhaps move this into Repo() ? 

930 if not isinstance(path, str): 

931 path = os.fsdecode(path) 

932 try: 

933 repo = Repo(path) 

934 except NotGitRepository: 

935 return None 

936 try: 

937 return repo.head() 

938 except KeyError: 

939 return None 

940 

941 

942def _has_directory_changed(tree_path: bytes, entry): 

943 """Check if a directory has changed after getting an error. 

944 

945 When handling an error trying to create a blob from a path, call this 

946 function. It will check if the path is a directory. If it's a directory 

947 and a submodule, check the submodule head to see if it's has changed. If 

948 not, consider the file as changed as Git tracked a file and not a 

949 directory. 

950 

951 Return true if the given path should be considered as changed and False 

952 otherwise or if the path is not a directory. 

953 """ 

954 # This is actually a directory 

955 if os.path.exists(os.path.join(tree_path, b".git")): 

956 # Submodule 

957 head = read_submodule_head(tree_path) 

958 if entry.sha != head: 

959 return True 

960 else: 

961 # The file was changed to a directory, so consider it removed. 

962 return True 

963 

964 return False 

965 

966 

967def get_unstaged_changes( 

968 index: Index, root_path: Union[str, bytes], filter_blob_callback=None 

969): 

970 """Walk through an index and check for differences against working tree. 

971 

972 Args: 

973 index: index to check 

974 root_path: path in which to find files 

975 Returns: iterator over paths with unstaged changes 

976 """ 

977 # For each entry in the index check the sha1 & ensure not staged 

978 if not isinstance(root_path, bytes): 

979 root_path = os.fsencode(root_path) 

980 

981 for tree_path, entry in index.iteritems(): 

982 full_path = _tree_to_fs_path(root_path, tree_path) 

983 if isinstance(entry, ConflictedIndexEntry): 

984 # Conflicted files are always unstaged 

985 yield tree_path 

986 continue 

987 

988 try: 

989 st = os.lstat(full_path) 

990 if stat.S_ISDIR(st.st_mode): 

991 if _has_directory_changed(tree_path, entry): 

992 yield tree_path 

993 continue 

994 

995 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

996 continue 

997 

998 blob = blob_from_path_and_stat(full_path, st) 

999 

1000 if filter_blob_callback is not None: 

1001 blob = filter_blob_callback(blob, tree_path) 

1002 except FileNotFoundError: 

1003 # The file was removed, so we assume that counts as 

1004 # different from whatever file used to exist. 

1005 yield tree_path 

1006 else: 

1007 if blob.id != entry.sha: 

1008 yield tree_path 

1009 

1010 

1011os_sep_bytes = os.sep.encode("ascii") 

1012 

1013 

1014def _tree_to_fs_path(root_path: bytes, tree_path: bytes): 

1015 """Convert a git tree path to a file system path. 

1016 

1017 Args: 

1018 root_path: Root filesystem path 

1019 tree_path: Git tree path as bytes 

1020 

1021 Returns: File system path. 

1022 """ 

1023 assert isinstance(tree_path, bytes) 

1024 if os_sep_bytes != b"/": 

1025 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

1026 else: 

1027 sep_corrected_path = tree_path 

1028 return os.path.join(root_path, sep_corrected_path) 

1029 

1030 

1031def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes: 

1032 """Convert a file system path to a git tree path. 

1033 

1034 Args: 

1035 fs_path: File system path. 

1036 

1037 Returns: Git tree path as bytes 

1038 """ 

1039 if not isinstance(fs_path, bytes): 

1040 fs_path_bytes = os.fsencode(fs_path) 

1041 else: 

1042 fs_path_bytes = fs_path 

1043 if os_sep_bytes != b"/": 

1044 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

1045 else: 

1046 tree_path = fs_path_bytes 

1047 return tree_path 

1048 

1049 

1050def index_entry_from_directory(st, path: bytes) -> Optional[IndexEntry]: 

1051 if os.path.exists(os.path.join(path, b".git")): 

1052 head = read_submodule_head(path) 

1053 if head is None: 

1054 return None 

1055 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

1056 return None 

1057 

1058 

1059def index_entry_from_path( 

1060 path: bytes, object_store: Optional[ObjectContainer] = None 

1061) -> Optional[IndexEntry]: 

1062 """Create an index from a filesystem path. 

1063 

1064 This returns an index value for files, symlinks 

1065 and tree references. for directories and 

1066 non-existent files it returns None 

1067 

1068 Args: 

1069 path: Path to create an index entry for 

1070 object_store: Optional object store to 

1071 save new blobs in 

1072 Returns: An index entry; None for directories 

1073 """ 

1074 assert isinstance(path, bytes) 

1075 st = os.lstat(path) 

1076 if stat.S_ISDIR(st.st_mode): 

1077 return index_entry_from_directory(st, path) 

1078 

1079 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

1080 blob = blob_from_path_and_stat(path, st) 

1081 if object_store is not None: 

1082 object_store.add_object(blob) 

1083 return index_entry_from_stat(st, blob.id) 

1084 

1085 return None 

1086 

1087 

1088def iter_fresh_entries( 

1089 paths: Iterable[bytes], 

1090 root_path: bytes, 

1091 object_store: Optional[ObjectContainer] = None, 

1092) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]: 

1093 """Iterate over current versions of index entries on disk. 

1094 

1095 Args: 

1096 paths: Paths to iterate over 

1097 root_path: Root path to access from 

1098 object_store: Optional store to save new blobs in 

1099 Returns: Iterator over path, index_entry 

1100 """ 

1101 for path in paths: 

1102 p = _tree_to_fs_path(root_path, path) 

1103 try: 

1104 entry = index_entry_from_path(p, object_store=object_store) 

1105 except (FileNotFoundError, IsADirectoryError): 

1106 entry = None 

1107 yield path, entry 

1108 

1109 

1110def iter_fresh_objects( 

1111 paths: Iterable[bytes], root_path: bytes, include_deleted=False, object_store=None 

1112) -> Iterator[Tuple[bytes, Optional[bytes], Optional[int]]]: 

1113 """Iterate over versions of objects on disk referenced by index. 

1114 

1115 Args: 

1116 root_path: Root path to access from 

1117 include_deleted: Include deleted entries with sha and 

1118 mode set to None 

1119 object_store: Optional object store to report new items to 

1120 Returns: Iterator over path, sha, mode 

1121 """ 

1122 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

1123 if entry is None: 

1124 if include_deleted: 

1125 yield path, None, None 

1126 else: 

1127 yield path, entry.sha, cleanup_mode(entry.mode) 

1128 

1129 

1130def refresh_index(index: Index, root_path: bytes): 

1131 """Refresh the contents of an index. 

1132 

1133 This is the equivalent to running 'git commit -a'. 

1134 

1135 Args: 

1136 index: Index to update 

1137 root_path: Root filesystem path 

1138 """ 

1139 for path, entry in iter_fresh_entries(index, root_path): 

1140 if entry: 

1141 index[path] = entry 

1142 

1143 

1144class locked_index: 

1145 """Lock the index while making modifications. 

1146 

1147 Works as a context manager. 

1148 """ 

1149 

1150 def __init__(self, path: Union[bytes, str]) -> None: 

1151 self._path = path 

1152 

1153 def __enter__(self): 

1154 self._file = GitFile(self._path, "wb") 

1155 self._index = Index(self._path) 

1156 return self._index 

1157 

1158 def __exit__(self, exc_type, exc_value, traceback): 

1159 if exc_type is not None: 

1160 self._file.abort() 

1161 return 

1162 try: 

1163 f = SHA1Writer(self._file) 

1164 write_index_dict(f, self._index._byname) 

1165 except BaseException: 

1166 self._file.abort() 

1167 else: 

1168 f.close()