Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1633 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36import binascii 

37from collections import defaultdict, deque 

38from contextlib import suppress 

39from io import BytesIO, UnsupportedOperation 

40 

41try: 

42 from cdifflib import CSequenceMatcher as SequenceMatcher 

43except ModuleNotFoundError: 

44 from difflib import SequenceMatcher 

45 

46import os 

47import struct 

48import sys 

49import warnings 

50import zlib 

51from collections.abc import Iterable, Iterator, Sequence, Set 

52from hashlib import sha1 

53from itertools import chain 

54from os import SEEK_CUR, SEEK_END 

55from struct import unpack_from 

56from types import TracebackType 

57from typing import ( 

58 IO, 

59 TYPE_CHECKING, 

60 Any, 

61 BinaryIO, 

62 Callable, 

63 Generic, 

64 Optional, 

65 Protocol, 

66 TypeVar, 

67 Union, 

68) 

69 

70try: 

71 import mmap 

72except ImportError: 

73 has_mmap = False 

74else: 

75 has_mmap = True 

76 

77if sys.version_info >= (3, 12): 

78 from collections.abc import Buffer 

79else: 

80 Buffer = Union[bytes, bytearray, memoryview] 

81 

82if TYPE_CHECKING: 

83 from _hashlib import HASH as HashObject 

84 

85 from .commit_graph import CommitGraph 

86 

87# For some reason the above try, except fails to set has_mmap = False for plan9 

88if sys.platform == "Plan9": 

89 has_mmap = False 

90 

91from . import replace_me 

92from .errors import ApplyDeltaError, ChecksumMismatch 

93from .file import GitFile, _GitFile 

94from .lru_cache import LRUSizeCache 

95from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex 

96 

97OFS_DELTA = 6 

98REF_DELTA = 7 

99 

100DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

101 

102 

103DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

104 

105# Keep pack files under 16Mb in memory, otherwise write them out to disk 

106PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

107 

108# Default pack index version to use when none is specified 

109DEFAULT_PACK_INDEX_VERSION = 2 

110 

111 

112OldUnpackedObject = Union[tuple[Union[bytes, int], list[bytes]], list[bytes]] 

113ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

114ProgressFn = Callable[[int, str], None] 

115PackHint = tuple[int, Optional[bytes]] 

116 

117 

118class UnresolvedDeltas(Exception): 

119 """Delta objects could not be resolved.""" 

120 

121 def __init__(self, shas: list[bytes]) -> None: 

122 """Initialize UnresolvedDeltas exception. 

123 

124 Args: 

125 shas: List of SHA hashes for unresolved delta objects 

126 """ 

127 self.shas = shas 

128 

129 

130class ObjectContainer(Protocol): 

131 """Protocol for objects that can contain git objects.""" 

132 

133 def add_object(self, obj: ShaFile) -> None: 

134 """Add a single object to this object store.""" 

135 

136 def add_objects( 

137 self, 

138 objects: Sequence[tuple[ShaFile, Optional[str]]], 

139 progress: Optional[Callable[..., None]] = None, 

140 ) -> Optional["Pack"]: 

141 """Add a set of objects to this object store. 

142 

143 Args: 

144 objects: Iterable over a list of (object, path) tuples 

145 progress: Progress callback for object insertion 

146 Returns: Optional Pack object of the objects written. 

147 """ 

148 

149 def __contains__(self, sha1: bytes) -> bool: 

150 """Check if a hex sha is present.""" 

151 

152 def __getitem__(self, sha1: bytes) -> ShaFile: 

153 """Retrieve an object.""" 

154 

155 def get_commit_graph(self) -> Optional["CommitGraph"]: 

156 """Get the commit graph for this object store. 

157 

158 Returns: 

159 CommitGraph object if available, None otherwise 

160 """ 

161 return None 

162 

163 

164class PackedObjectContainer(ObjectContainer): 

165 """Container for objects packed in a pack file.""" 

166 

167 def get_unpacked_object( 

168 self, sha1: bytes, *, include_comp: bool = False 

169 ) -> "UnpackedObject": 

170 """Get a raw unresolved object. 

171 

172 Args: 

173 sha1: SHA-1 hash of the object 

174 include_comp: Whether to include compressed data 

175 

176 Returns: 

177 UnpackedObject instance 

178 """ 

179 raise NotImplementedError(self.get_unpacked_object) 

180 

181 def iterobjects_subset( 

182 self, shas: Iterable[bytes], *, allow_missing: bool = False 

183 ) -> Iterator[ShaFile]: 

184 """Iterate over a subset of objects. 

185 

186 Args: 

187 shas: Iterable of object SHAs to retrieve 

188 allow_missing: If True, skip missing objects 

189 

190 Returns: 

191 Iterator of ShaFile objects 

192 """ 

193 raise NotImplementedError(self.iterobjects_subset) 

194 

195 def iter_unpacked_subset( 

196 self, 

197 shas: Iterable[bytes], 

198 *, 

199 include_comp: bool = False, 

200 allow_missing: bool = False, 

201 convert_ofs_delta: bool = True, 

202 ) -> Iterator["UnpackedObject"]: 

203 """Iterate over unpacked objects from a subset of SHAs. 

204 

205 Args: 

206 shas: Set of object SHAs to retrieve 

207 include_comp: Include compressed data if True 

208 allow_missing: If True, skip missing objects 

209 convert_ofs_delta: If True, convert offset deltas to ref deltas 

210 

211 Returns: 

212 Iterator of UnpackedObject instances 

213 """ 

214 raise NotImplementedError(self.iter_unpacked_subset) 

215 

216 

217class UnpackedObjectStream: 

218 """Abstract base class for a stream of unpacked objects.""" 

219 

220 def __iter__(self) -> Iterator["UnpackedObject"]: 

221 """Iterate over unpacked objects.""" 

222 raise NotImplementedError(self.__iter__) 

223 

224 def __len__(self) -> int: 

225 """Return the number of objects in the stream.""" 

226 raise NotImplementedError(self.__len__) 

227 

228 

229def take_msb_bytes( 

230 read: Callable[[int], bytes], crc32: Optional[int] = None 

231) -> tuple[list[int], Optional[int]]: 

232 """Read bytes marked with most significant bit. 

233 

234 Args: 

235 read: Read function 

236 crc32: Optional CRC32 checksum to update 

237 

238 Returns: 

239 Tuple of (list of bytes read, updated CRC32 or None) 

240 """ 

241 ret: list[int] = [] 

242 while len(ret) == 0 or ret[-1] & 0x80: 

243 b = read(1) 

244 if crc32 is not None: 

245 crc32 = binascii.crc32(b, crc32) 

246 ret.append(ord(b[:1])) 

247 return ret, crc32 

248 

249 

250class PackFileDisappeared(Exception): 

251 """Raised when a pack file unexpectedly disappears.""" 

252 

253 def __init__(self, obj: object) -> None: 

254 """Initialize PackFileDisappeared exception. 

255 

256 Args: 

257 obj: The object that triggered the exception 

258 """ 

259 self.obj = obj 

260 

261 

262class UnpackedObject: 

263 """Class encapsulating an object unpacked from a pack file. 

264 

265 These objects should only be created from within unpack_object. Most 

266 members start out as empty and are filled in at various points by 

267 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

268 

269 End users of this object should take care that the function they're getting 

270 this object from is guaranteed to set the members they need. 

271 """ 

272 

273 __slots__ = [ 

274 "_sha", # Cached binary SHA. 

275 "comp_chunks", # Compressed object chunks. 

276 "crc32", # CRC32. 

277 "decomp_chunks", # Decompressed object chunks. 

278 "decomp_len", # Decompressed length of this object. 

279 "delta_base", # Delta base offset or SHA. 

280 "obj_chunks", # Decompressed and delta-resolved chunks. 

281 "obj_type_num", # Type of this object. 

282 "offset", # Offset in its pack. 

283 "pack_type_num", # Type of this object in the pack (may be a delta). 

284 ] 

285 

286 obj_type_num: Optional[int] 

287 obj_chunks: Optional[list[bytes]] 

288 delta_base: Union[None, bytes, int] 

289 decomp_chunks: list[bytes] 

290 comp_chunks: Optional[list[bytes]] 

291 decomp_len: Optional[int] 

292 crc32: Optional[int] 

293 offset: Optional[int] 

294 pack_type_num: int 

295 _sha: Optional[bytes] 

296 

297 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

298 # methods of this object. 

299 def __init__( 

300 self, 

301 pack_type_num: int, 

302 *, 

303 delta_base: Union[None, bytes, int] = None, 

304 decomp_len: Optional[int] = None, 

305 crc32: Optional[int] = None, 

306 sha: Optional[bytes] = None, 

307 decomp_chunks: Optional[list[bytes]] = None, 

308 offset: Optional[int] = None, 

309 ) -> None: 

310 """Initialize an UnpackedObject. 

311 

312 Args: 

313 pack_type_num: Type number of this object in the pack 

314 delta_base: Delta base (offset or SHA) if this is a delta object 

315 decomp_len: Decompressed length of this object 

316 crc32: CRC32 checksum 

317 sha: SHA-1 hash of the object 

318 decomp_chunks: Decompressed chunks 

319 offset: Offset in the pack file 

320 """ 

321 self.offset = offset 

322 self._sha = sha 

323 self.pack_type_num = pack_type_num 

324 self.delta_base = delta_base 

325 self.comp_chunks = None 

326 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

327 if decomp_chunks is not None and decomp_len is None: 

328 self.decomp_len = sum(map(len, decomp_chunks)) 

329 else: 

330 self.decomp_len = decomp_len 

331 self.crc32 = crc32 

332 

333 if pack_type_num in DELTA_TYPES: 

334 self.obj_type_num = None 

335 self.obj_chunks = None 

336 else: 

337 self.obj_type_num = pack_type_num 

338 self.obj_chunks = self.decomp_chunks 

339 self.delta_base = delta_base 

340 

341 def sha(self) -> bytes: 

342 """Return the binary SHA of this object.""" 

343 if self._sha is None: 

344 assert self.obj_type_num is not None and self.obj_chunks is not None 

345 self._sha = obj_sha(self.obj_type_num, self.obj_chunks) 

346 return self._sha 

347 

348 def sha_file(self) -> ShaFile: 

349 """Return a ShaFile from this object.""" 

350 assert self.obj_type_num is not None and self.obj_chunks is not None 

351 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

352 

353 # Only provided for backwards compatibility with code that expects either 

354 # chunks or a delta tuple. 

355 def _obj(self) -> OldUnpackedObject: 

356 """Return the decompressed chunks, or (delta base, delta chunks).""" 

357 if self.pack_type_num in DELTA_TYPES: 

358 assert isinstance(self.delta_base, (bytes, int)) 

359 return (self.delta_base, self.decomp_chunks) 

360 else: 

361 return self.decomp_chunks 

362 

363 def __eq__(self, other: object) -> bool: 

364 """Check equality with another UnpackedObject.""" 

365 if not isinstance(other, UnpackedObject): 

366 return False 

367 for slot in self.__slots__: 

368 if getattr(self, slot) != getattr(other, slot): 

369 return False 

370 return True 

371 

372 def __ne__(self, other: object) -> bool: 

373 """Check inequality with another UnpackedObject.""" 

374 return not (self == other) 

375 

376 def __repr__(self) -> str: 

377 """Return string representation of this UnpackedObject.""" 

378 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

379 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

380 

381 

382_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

383 

384 

385def read_zlib_chunks( 

386 read_some: Callable[[int], bytes], 

387 unpacked: UnpackedObject, 

388 include_comp: bool = False, 

389 buffer_size: int = _ZLIB_BUFSIZE, 

390) -> bytes: 

391 """Read zlib data from a buffer. 

392 

393 This function requires that the buffer have additional data following the 

394 compressed data, which is guaranteed to be the case for git pack files. 

395 

396 Args: 

397 read_some: Read function that returns at least one byte, but may 

398 return less than the requested size. 

399 unpacked: An UnpackedObject to write result data to. If its crc32 

400 attr is not None, the CRC32 of the compressed bytes will be computed 

401 using this starting CRC32. 

402 After this function, will have the following attrs set: 

403 * comp_chunks (if include_comp is True) 

404 * decomp_chunks 

405 * decomp_len 

406 * crc32 

407 include_comp: If True, include compressed data in the result. 

408 buffer_size: Size of the read buffer. 

409 Returns: Leftover unused data from the decompression. 

410 

411 Raises: 

412 zlib.error: if a decompression error occurred. 

413 """ 

414 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

415 raise ValueError("non-negative zlib data stream size expected") 

416 decomp_obj = zlib.decompressobj() 

417 

418 comp_chunks = [] 

419 decomp_chunks = unpacked.decomp_chunks 

420 decomp_len = 0 

421 crc32 = unpacked.crc32 

422 

423 while True: 

424 add = read_some(buffer_size) 

425 if not add: 

426 raise zlib.error("EOF before end of zlib stream") 

427 comp_chunks.append(add) 

428 decomp = decomp_obj.decompress(add) 

429 decomp_len += len(decomp) 

430 decomp_chunks.append(decomp) 

431 unused = decomp_obj.unused_data 

432 if unused: 

433 left = len(unused) 

434 if crc32 is not None: 

435 crc32 = binascii.crc32(add[:-left], crc32) 

436 if include_comp: 

437 comp_chunks[-1] = add[:-left] 

438 break 

439 elif crc32 is not None: 

440 crc32 = binascii.crc32(add, crc32) 

441 if crc32 is not None: 

442 crc32 &= 0xFFFFFFFF 

443 

444 if decomp_len != unpacked.decomp_len: 

445 raise zlib.error("decompressed data does not match expected size") 

446 

447 unpacked.crc32 = crc32 

448 if include_comp: 

449 unpacked.comp_chunks = comp_chunks 

450 return unused 

451 

452 

453def iter_sha1(iter: Iterable[bytes]) -> bytes: 

454 """Return the hexdigest of the SHA1 over a set of names. 

455 

456 Args: 

457 iter: Iterator over string objects 

458 Returns: 40-byte hex sha1 digest 

459 """ 

460 sha = sha1() 

461 for name in iter: 

462 sha.update(name) 

463 return sha.hexdigest().encode("ascii") 

464 

465 

466def load_pack_index(path: Union[str, os.PathLike[str]]) -> "PackIndex": 

467 """Load an index file by path. 

468 

469 Args: 

470 path: Path to the index file 

471 Returns: A PackIndex loaded from the given path 

472 """ 

473 with GitFile(path, "rb") as f: 

474 return load_pack_index_file(path, f) 

475 

476 

477def _load_file_contents( 

478 f: Union[IO[bytes], _GitFile], size: Optional[int] = None 

479) -> tuple[Union[bytes, Any], int]: 

480 """Load contents from a file, preferring mmap when possible. 

481 

482 Args: 

483 f: File-like object to load 

484 size: Expected size, or None to determine from file 

485 Returns: Tuple of (contents, size) 

486 """ 

487 try: 

488 fd = f.fileno() 

489 except (UnsupportedOperation, AttributeError): 

490 fd = None 

491 # Attempt to use mmap if possible 

492 if fd is not None: 

493 if size is None: 

494 size = os.fstat(fd).st_size 

495 if has_mmap: 

496 try: 

497 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

498 except (OSError, ValueError): 

499 # Can't mmap - perhaps a socket or invalid file descriptor 

500 pass 

501 else: 

502 return contents, size 

503 contents_bytes = f.read() 

504 size = len(contents_bytes) 

505 return contents_bytes, size 

506 

507 

508def load_pack_index_file( 

509 path: Union[str, os.PathLike[str]], f: Union[IO[bytes], _GitFile] 

510) -> "PackIndex": 

511 """Load an index file from a file-like object. 

512 

513 Args: 

514 path: Path for the index file 

515 f: File-like object 

516 Returns: A PackIndex loaded from the given file 

517 """ 

518 contents, size = _load_file_contents(f) 

519 if contents[:4] == b"\377tOc": 

520 version = struct.unpack(b">L", contents[4:8])[0] 

521 if version == 2: 

522 return PackIndex2(path, file=f, contents=contents, size=size) 

523 elif version == 3: 

524 return PackIndex3(path, file=f, contents=contents, size=size) 

525 else: 

526 raise KeyError(f"Unknown pack index format {version}") 

527 else: 

528 return PackIndex1(path, file=f, contents=contents, size=size) 

529 

530 

531def bisect_find_sha( 

532 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

533) -> Optional[int]: 

534 """Find a SHA in a data blob with sorted SHAs. 

535 

536 Args: 

537 start: Start index of range to search 

538 end: End index of range to search 

539 sha: Sha to find 

540 unpack_name: Callback to retrieve SHA by index 

541 Returns: Index of the SHA, or None if it wasn't found 

542 """ 

543 assert start <= end 

544 while start <= end: 

545 i = (start + end) // 2 

546 file_sha = unpack_name(i) 

547 if file_sha < sha: 

548 start = i + 1 

549 elif file_sha > sha: 

550 end = i - 1 

551 else: 

552 return i 

553 return None 

554 

555 

556PackIndexEntry = tuple[bytes, int, Optional[int]] 

557 

558 

559class PackIndex: 

560 """An index in to a packfile. 

561 

562 Given a sha id of an object a pack index can tell you the location in the 

563 packfile of that object if it has it. 

564 """ 

565 

566 # Default to SHA-1 for backward compatibility 

567 hash_algorithm = 1 

568 hash_size = 20 

569 

570 def __eq__(self, other: object) -> bool: 

571 """Check equality with another PackIndex.""" 

572 if not isinstance(other, PackIndex): 

573 return False 

574 

575 for (name1, _, _), (name2, _, _) in zip( 

576 self.iterentries(), other.iterentries() 

577 ): 

578 if name1 != name2: 

579 return False 

580 return True 

581 

582 def __ne__(self, other: object) -> bool: 

583 """Check if this pack index is not equal to another.""" 

584 return not self.__eq__(other) 

585 

586 def __len__(self) -> int: 

587 """Return the number of entries in this pack index.""" 

588 raise NotImplementedError(self.__len__) 

589 

590 def __iter__(self) -> Iterator[bytes]: 

591 """Iterate over the SHAs in this pack.""" 

592 return map(sha_to_hex, self._itersha()) 

593 

594 def iterentries(self) -> Iterator[PackIndexEntry]: 

595 """Iterate over the entries in this pack index. 

596 

597 Returns: iterator over tuples with object name, offset in packfile and 

598 crc32 checksum. 

599 """ 

600 raise NotImplementedError(self.iterentries) 

601 

602 def get_pack_checksum(self) -> Optional[bytes]: 

603 """Return the SHA1 checksum stored for the corresponding packfile. 

604 

605 Returns: 20-byte binary digest, or None if not available 

606 """ 

607 raise NotImplementedError(self.get_pack_checksum) 

608 

609 @replace_me(since="0.21.0", remove_in="0.23.0") 

610 def object_index(self, sha: bytes) -> int: 

611 """Return the index for the given SHA. 

612 

613 Args: 

614 sha: SHA-1 hash 

615 

616 Returns: 

617 Index position 

618 """ 

619 return self.object_offset(sha) 

620 

621 def object_offset(self, sha: bytes) -> int: 

622 """Return the offset in to the corresponding packfile for the object. 

623 

624 Given the name of an object it will return the offset that object 

625 lives at within the corresponding pack file. If the pack file doesn't 

626 have the object then None will be returned. 

627 """ 

628 raise NotImplementedError(self.object_offset) 

629 

630 def object_sha1(self, index: int) -> bytes: 

631 """Return the SHA1 corresponding to the index in the pack file.""" 

632 for name, offset, _crc32 in self.iterentries(): 

633 if offset == index: 

634 return name 

635 else: 

636 raise KeyError(index) 

637 

638 def _object_offset(self, sha: bytes) -> int: 

639 """See object_offset. 

640 

641 Args: 

642 sha: A *binary* SHA string. (20 characters long)_ 

643 """ 

644 raise NotImplementedError(self._object_offset) 

645 

646 def objects_sha1(self) -> bytes: 

647 """Return the hex SHA1 over all the shas of all objects in this pack. 

648 

649 Note: This is used for the filename of the pack. 

650 """ 

651 return iter_sha1(self._itersha()) 

652 

653 def _itersha(self) -> Iterator[bytes]: 

654 """Yield all the SHA1's of the objects in the index, sorted.""" 

655 raise NotImplementedError(self._itersha) 

656 

657 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

658 """Iterate over all SHA1s with the given prefix. 

659 

660 Args: 

661 prefix: Binary prefix to match 

662 Returns: Iterator of matching SHA1s 

663 """ 

664 # Default implementation for PackIndex classes that don't override 

665 for sha, _, _ in self.iterentries(): 

666 if sha.startswith(prefix): 

667 yield sha 

668 

669 def close(self) -> None: 

670 """Close any open files.""" 

671 

672 def check(self) -> None: 

673 """Check the consistency of this pack index.""" 

674 

675 

676class MemoryPackIndex(PackIndex): 

677 """Pack index that is stored entirely in memory.""" 

678 

679 def __init__( 

680 self, 

681 entries: list[tuple[bytes, int, Optional[int]]], 

682 pack_checksum: Optional[bytes] = None, 

683 ) -> None: 

684 """Create a new MemoryPackIndex. 

685 

686 Args: 

687 entries: Sequence of name, idx, crc32 (sorted) 

688 pack_checksum: Optional pack checksum 

689 """ 

690 self._by_sha = {} 

691 self._by_offset = {} 

692 for name, offset, _crc32 in entries: 

693 self._by_sha[name] = offset 

694 self._by_offset[offset] = name 

695 self._entries = entries 

696 self._pack_checksum = pack_checksum 

697 

698 def get_pack_checksum(self) -> Optional[bytes]: 

699 """Return the SHA checksum stored for the corresponding packfile.""" 

700 return self._pack_checksum 

701 

702 def __len__(self) -> int: 

703 """Return the number of entries in this pack index.""" 

704 return len(self._entries) 

705 

706 def object_offset(self, sha: bytes) -> int: 

707 """Return the offset for the given SHA. 

708 

709 Args: 

710 sha: SHA to look up (binary or hex) 

711 Returns: Offset in the pack file 

712 """ 

713 if len(sha) == 40: 

714 sha = hex_to_sha(sha) 

715 return self._by_sha[sha] 

716 

717 def object_sha1(self, offset: int) -> bytes: 

718 """Return the SHA1 for the object at the given offset.""" 

719 return self._by_offset[offset] 

720 

721 def _itersha(self) -> Iterator[bytes]: 

722 """Iterate over all SHA1s in the index.""" 

723 return iter(self._by_sha) 

724 

725 def iterentries(self) -> Iterator[PackIndexEntry]: 

726 """Iterate over all index entries.""" 

727 return iter(self._entries) 

728 

729 @classmethod 

730 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

731 """Create a MemoryPackIndex from a PackData object.""" 

732 return MemoryPackIndex( 

733 list(pack_data.sorted_entries()), pack_data.get_stored_checksum() 

734 ) 

735 

736 @classmethod 

737 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

738 """Create a copy of another PackIndex in memory.""" 

739 return cls(list(other_index.iterentries()), other_index.get_pack_checksum()) 

740 

741 

742class FilePackIndex(PackIndex): 

743 """Pack index that is based on a file. 

744 

745 To do the loop it opens the file, and indexes first 256 4 byte groups 

746 with the first byte of the sha id. The value in the four byte group indexed 

747 is the end of the group that shares the same starting byte. Subtract one 

748 from the starting byte and index again to find the start of the group. 

749 The values are sorted by sha id within the group, so do the math to find 

750 the start and end offset and then bisect in to find if the value is 

751 present. 

752 """ 

753 

754 _fan_out_table: list[int] 

755 _file: Union[IO[bytes], _GitFile] 

756 

757 def __init__( 

758 self, 

759 filename: Union[str, os.PathLike[str]], 

760 file: Optional[Union[IO[bytes], _GitFile]] = None, 

761 contents: Optional[Union[bytes, "mmap.mmap"]] = None, 

762 size: Optional[int] = None, 

763 ) -> None: 

764 """Create a pack index object. 

765 

766 Provide it with the name of the index file to consider, and it will map 

767 it whenever required. 

768 """ 

769 self._filename = filename 

770 # Take the size now, so it can be checked each time we map the file to 

771 # ensure that it hasn't changed. 

772 if file is None: 

773 self._file = GitFile(filename, "rb") 

774 else: 

775 self._file = file 

776 if contents is None: 

777 self._contents, self._size = _load_file_contents(self._file, size) 

778 else: 

779 self._contents = contents 

780 self._size = size if size is not None else len(contents) 

781 

782 @property 

783 def path(self) -> str: 

784 """Return the path to this index file.""" 

785 return os.fspath(self._filename) 

786 

787 def __eq__(self, other: object) -> bool: 

788 """Check equality with another FilePackIndex.""" 

789 # Quick optimization: 

790 if ( 

791 isinstance(other, FilePackIndex) 

792 and self._fan_out_table != other._fan_out_table 

793 ): 

794 return False 

795 

796 return super().__eq__(other) 

797 

798 def close(self) -> None: 

799 """Close the underlying file and any mmap.""" 

800 self._file.close() 

801 close_fn = getattr(self._contents, "close", None) 

802 if close_fn is not None: 

803 close_fn() 

804 

805 def __len__(self) -> int: 

806 """Return the number of entries in this pack index.""" 

807 return self._fan_out_table[-1] 

808 

809 def _unpack_entry(self, i: int) -> PackIndexEntry: 

810 """Unpack the i-th entry in the index file. 

811 

812 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

813 checksum (if known). 

814 """ 

815 raise NotImplementedError(self._unpack_entry) 

816 

817 def _unpack_name(self, i: int) -> bytes: 

818 """Unpack the i-th name from the index file.""" 

819 raise NotImplementedError(self._unpack_name) 

820 

821 def _unpack_offset(self, i: int) -> int: 

822 """Unpack the i-th object offset from the index file.""" 

823 raise NotImplementedError(self._unpack_offset) 

824 

825 def _unpack_crc32_checksum(self, i: int) -> Optional[int]: 

826 """Unpack the crc32 checksum for the ith object from the index file.""" 

827 raise NotImplementedError(self._unpack_crc32_checksum) 

828 

829 def _itersha(self) -> Iterator[bytes]: 

830 """Iterate over all SHA1s in the index.""" 

831 for i in range(len(self)): 

832 yield self._unpack_name(i) 

833 

834 def iterentries(self) -> Iterator[PackIndexEntry]: 

835 """Iterate over the entries in this pack index. 

836 

837 Returns: iterator over tuples with object name, offset in packfile and 

838 crc32 checksum. 

839 """ 

840 for i in range(len(self)): 

841 yield self._unpack_entry(i) 

842 

843 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

844 """Read the fan-out table from the index. 

845 

846 The fan-out table contains 256 entries mapping first byte values 

847 to the number of objects with SHA1s less than or equal to that byte. 

848 

849 Args: 

850 start_offset: Offset in the file where the fan-out table starts 

851 Returns: List of 256 integers 

852 """ 

853 ret = [] 

854 for i in range(0x100): 

855 fanout_entry = self._contents[ 

856 start_offset + i * 4 : start_offset + (i + 1) * 4 

857 ] 

858 ret.append(struct.unpack(">L", fanout_entry)[0]) 

859 return ret 

860 

861 def check(self) -> None: 

862 """Check that the stored checksum matches the actual checksum.""" 

863 actual = self.calculate_checksum() 

864 stored = self.get_stored_checksum() 

865 if actual != stored: 

866 raise ChecksumMismatch(stored, actual) 

867 

868 def calculate_checksum(self) -> bytes: 

869 """Calculate the SHA1 checksum over this pack index. 

870 

871 Returns: This is a 20-byte binary digest 

872 """ 

873 return sha1(self._contents[:-20]).digest() 

874 

875 def get_pack_checksum(self) -> bytes: 

876 """Return the SHA1 checksum stored for the corresponding packfile. 

877 

878 Returns: 20-byte binary digest 

879 """ 

880 return bytes(self._contents[-40:-20]) 

881 

882 def get_stored_checksum(self) -> bytes: 

883 """Return the SHA1 checksum stored for this index. 

884 

885 Returns: 20-byte binary digest 

886 """ 

887 return bytes(self._contents[-20:]) 

888 

889 def object_offset(self, sha: bytes) -> int: 

890 """Return the offset in to the corresponding packfile for the object. 

891 

892 Given the name of an object it will return the offset that object 

893 lives at within the corresponding pack file. If the pack file doesn't 

894 have the object then None will be returned. 

895 """ 

896 if len(sha) == 40: 

897 sha = hex_to_sha(sha) 

898 try: 

899 return self._object_offset(sha) 

900 except ValueError as exc: 

901 closed = getattr(self._contents, "closed", None) 

902 if closed in (None, True): 

903 raise PackFileDisappeared(self) from exc 

904 raise 

905 

906 def _object_offset(self, sha: bytes) -> int: 

907 """See object_offset. 

908 

909 Args: 

910 sha: A *binary* SHA string. (20 characters long)_ 

911 """ 

912 assert len(sha) == 20 

913 idx = ord(sha[:1]) 

914 if idx == 0: 

915 start = 0 

916 else: 

917 start = self._fan_out_table[idx - 1] 

918 end = self._fan_out_table[idx] 

919 i = bisect_find_sha(start, end, sha, self._unpack_name) 

920 if i is None: 

921 raise KeyError(sha) 

922 return self._unpack_offset(i) 

923 

924 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

925 """Iterate over all SHA1s with the given prefix.""" 

926 start = ord(prefix[:1]) 

927 if start == 0: 

928 start = 0 

929 else: 

930 start = self._fan_out_table[start - 1] 

931 end = ord(prefix[:1]) + 1 

932 if end == 0x100: 

933 end = len(self) 

934 else: 

935 end = self._fan_out_table[end] 

936 assert start <= end 

937 started = False 

938 for i in range(start, end): 

939 name: bytes = self._unpack_name(i) 

940 if name.startswith(prefix): 

941 yield name 

942 started = True 

943 elif started: 

944 break 

945 

946 

947class PackIndex1(FilePackIndex): 

948 """Version 1 Pack Index file.""" 

949 

950 def __init__( 

951 self, 

952 filename: Union[str, os.PathLike[str]], 

953 file: Optional[Union[IO[bytes], _GitFile]] = None, 

954 contents: Optional[bytes] = None, 

955 size: Optional[int] = None, 

956 ) -> None: 

957 """Initialize a version 1 pack index. 

958 

959 Args: 

960 filename: Path to the index file 

961 file: Optional file object 

962 contents: Optional mmap'd contents 

963 size: Optional size of the index 

964 """ 

965 super().__init__(filename, file, contents, size) 

966 self.version = 1 

967 self._fan_out_table = self._read_fan_out_table(0) 

968 

969 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]: 

970 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24)) 

971 return (name, offset, None) 

972 

973 def _unpack_name(self, i: int) -> bytes: 

974 offset = (0x100 * 4) + (i * 24) + 4 

975 return self._contents[offset : offset + 20] 

976 

977 def _unpack_offset(self, i: int) -> int: 

978 offset = (0x100 * 4) + (i * 24) 

979 result = unpack_from(">L", self._contents, offset)[0] 

980 assert isinstance(result, int) 

981 return result 

982 

983 def _unpack_crc32_checksum(self, i: int) -> None: 

984 # Not stored in v1 index files 

985 return None 

986 

987 

988class PackIndex2(FilePackIndex): 

989 """Version 2 Pack Index file.""" 

990 

991 def __init__( 

992 self, 

993 filename: Union[str, os.PathLike[str]], 

994 file: Optional[Union[IO[bytes], _GitFile]] = None, 

995 contents: Optional[bytes] = None, 

996 size: Optional[int] = None, 

997 ) -> None: 

998 """Initialize a version 2 pack index. 

999 

1000 Args: 

1001 filename: Path to the index file 

1002 file: Optional file object 

1003 contents: Optional mmap'd contents 

1004 size: Optional size of the index 

1005 """ 

1006 super().__init__(filename, file, contents, size) 

1007 if self._contents[:4] != b"\377tOc": 

1008 raise AssertionError("Not a v2 pack index file") 

1009 (self.version,) = unpack_from(b">L", self._contents, 4) 

1010 if self.version != 2: 

1011 raise AssertionError(f"Version was {self.version}") 

1012 self._fan_out_table = self._read_fan_out_table(8) 

1013 self._name_table_offset = 8 + 0x100 * 4 

1014 self._crc32_table_offset = self._name_table_offset + 20 * len(self) 

1015 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1016 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1017 self 

1018 ) 

1019 

1020 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1021 return ( 

1022 self._unpack_name(i), 

1023 self._unpack_offset(i), 

1024 self._unpack_crc32_checksum(i), 

1025 ) 

1026 

1027 def _unpack_name(self, i: int) -> bytes: 

1028 offset = self._name_table_offset + i * 20 

1029 return self._contents[offset : offset + 20] 

1030 

1031 def _unpack_offset(self, i: int) -> int: 

1032 offset_pos = self._pack_offset_table_offset + i * 4 

1033 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1034 assert isinstance(offset, int) 

1035 if offset & (2**31): 

1036 large_offset_pos = ( 

1037 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1038 ) 

1039 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1040 assert isinstance(offset, int) 

1041 return offset 

1042 

1043 def _unpack_crc32_checksum(self, i: int) -> int: 

1044 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1045 assert isinstance(result, int) 

1046 return result 

1047 

1048 

1049class PackIndex3(FilePackIndex): 

1050 """Version 3 Pack Index file. 

1051 

1052 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1053 """ 

1054 

1055 def __init__( 

1056 self, 

1057 filename: Union[str, os.PathLike[str]], 

1058 file: Optional[Union[IO[bytes], _GitFile]] = None, 

1059 contents: Optional[bytes] = None, 

1060 size: Optional[int] = None, 

1061 ) -> None: 

1062 """Initialize a version 3 pack index. 

1063 

1064 Args: 

1065 filename: Path to the index file 

1066 file: Optional file object 

1067 contents: Optional mmap'd contents 

1068 size: Optional size of the index 

1069 """ 

1070 super().__init__(filename, file, contents, size) 

1071 if self._contents[:4] != b"\377tOc": 

1072 raise AssertionError("Not a v3 pack index file") 

1073 (self.version,) = unpack_from(b">L", self._contents, 4) 

1074 if self.version != 3: 

1075 raise AssertionError(f"Version was {self.version}") 

1076 

1077 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1078 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8) 

1079 if self.hash_algorithm == 1: 

1080 self.hash_size = 20 # SHA-1 

1081 elif self.hash_algorithm == 2: 

1082 self.hash_size = 32 # SHA-256 

1083 else: 

1084 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}") 

1085 

1086 # Read length of shortened object names 

1087 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1088 

1089 # Calculate offsets based on variable hash size 

1090 self._fan_out_table = self._read_fan_out_table( 

1091 16 

1092 ) # After header (4 + 4 + 4 + 4) 

1093 self._name_table_offset = 16 + 0x100 * 4 

1094 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1095 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1096 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1097 self 

1098 ) 

1099 

1100 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1101 return ( 

1102 self._unpack_name(i), 

1103 self._unpack_offset(i), 

1104 self._unpack_crc32_checksum(i), 

1105 ) 

1106 

1107 def _unpack_name(self, i: int) -> bytes: 

1108 offset = self._name_table_offset + i * self.hash_size 

1109 return self._contents[offset : offset + self.hash_size] 

1110 

1111 def _unpack_offset(self, i: int) -> int: 

1112 offset_pos = self._pack_offset_table_offset + i * 4 

1113 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1114 assert isinstance(offset, int) 

1115 if offset & (2**31): 

1116 large_offset_pos = ( 

1117 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1118 ) 

1119 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1120 assert isinstance(offset, int) 

1121 return offset 

1122 

1123 def _unpack_crc32_checksum(self, i: int) -> int: 

1124 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1125 assert isinstance(result, int) 

1126 return result 

1127 

1128 

1129def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1130 """Read the header of a pack file. 

1131 

1132 Args: 

1133 read: Read function 

1134 Returns: Tuple of (pack version, number of objects). If no data is 

1135 available to read, returns (None, None). 

1136 """ 

1137 header = read(12) 

1138 if not header: 

1139 raise AssertionError("file too short to contain pack") 

1140 if header[:4] != b"PACK": 

1141 raise AssertionError(f"Invalid pack header {header!r}") 

1142 (version,) = unpack_from(b">L", header, 4) 

1143 if version not in (2, 3): 

1144 raise AssertionError(f"Version was {version}") 

1145 (num_objects,) = unpack_from(b">L", header, 8) 

1146 return (version, num_objects) 

1147 

1148 

1149def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int: 

1150 """Get the total length of a sequence of chunks. 

1151 

1152 Args: 

1153 chunks: Either a single bytes object or an iterable of bytes 

1154 Returns: Total length in bytes 

1155 """ 

1156 if isinstance(chunks, bytes): 

1157 return len(chunks) 

1158 else: 

1159 return sum(map(len, chunks)) 

1160 

1161 

1162def unpack_object( 

1163 read_all: Callable[[int], bytes], 

1164 read_some: Optional[Callable[[int], bytes]] = None, 

1165 compute_crc32: bool = False, 

1166 include_comp: bool = False, 

1167 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1168) -> tuple[UnpackedObject, bytes]: 

1169 """Unpack a Git object. 

1170 

1171 Args: 

1172 read_all: Read function that blocks until the number of requested 

1173 bytes are read. 

1174 read_some: Read function that returns at least one byte, but may not 

1175 return the number of bytes requested. 

1176 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1177 False, the returned CRC32 will be None. 

1178 include_comp: If True, include compressed data in the result. 

1179 zlib_bufsize: An optional buffer size for zlib operations. 

1180 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1181 leftover from decompression, and unpacked in an UnpackedObject with 

1182 the following attrs set: 

1183 

1184 * obj_chunks (for non-delta types) 

1185 * pack_type_num 

1186 * delta_base (for delta types) 

1187 * comp_chunks (if include_comp is True) 

1188 * decomp_chunks 

1189 * decomp_len 

1190 * crc32 (if compute_crc32 is True) 

1191 """ 

1192 if read_some is None: 

1193 read_some = read_all 

1194 if compute_crc32: 

1195 crc32 = 0 

1196 else: 

1197 crc32 = None 

1198 

1199 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1200 type_num = (raw[0] >> 4) & 0x07 

1201 size = raw[0] & 0x0F 

1202 for i, byte in enumerate(raw[1:]): 

1203 size += (byte & 0x7F) << ((i * 7) + 4) 

1204 

1205 delta_base: Union[int, bytes, None] 

1206 raw_base = len(raw) 

1207 if type_num == OFS_DELTA: 

1208 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1209 raw_base += len(raw) 

1210 if raw[-1] & 0x80: 

1211 raise AssertionError 

1212 delta_base_offset = raw[0] & 0x7F 

1213 for byte in raw[1:]: 

1214 delta_base_offset += 1 

1215 delta_base_offset <<= 7 

1216 delta_base_offset += byte & 0x7F 

1217 delta_base = delta_base_offset 

1218 elif type_num == REF_DELTA: 

1219 delta_base_obj = read_all(20) 

1220 if crc32 is not None: 

1221 crc32 = binascii.crc32(delta_base_obj, crc32) 

1222 delta_base = delta_base_obj 

1223 raw_base += 20 

1224 else: 

1225 delta_base = None 

1226 

1227 unpacked = UnpackedObject( 

1228 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32 

1229 ) 

1230 unused = read_zlib_chunks( 

1231 read_some, 

1232 unpacked, 

1233 buffer_size=zlib_bufsize, 

1234 include_comp=include_comp, 

1235 ) 

1236 return unpacked, unused 

1237 

1238 

1239def _compute_object_size(value: tuple[int, Any]) -> int: 

1240 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1241 (num, obj) = value 

1242 if num in DELTA_TYPES: 

1243 return chunks_length(obj[1]) 

1244 return chunks_length(obj) 

1245 

1246 

1247class PackStreamReader: 

1248 """Class to read a pack stream. 

1249 

1250 The pack is read from a ReceivableProtocol using read() or recv() as 

1251 appropriate. 

1252 """ 

1253 

1254 def __init__( 

1255 self, 

1256 read_all: Callable[[int], bytes], 

1257 read_some: Optional[Callable[[int], bytes]] = None, 

1258 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1259 ) -> None: 

1260 """Initialize pack stream reader. 

1261 

1262 Args: 

1263 read_all: Function to read all requested bytes 

1264 read_some: Function to read some bytes (optional) 

1265 zlib_bufsize: Buffer size for zlib decompression 

1266 """ 

1267 self.read_all = read_all 

1268 if read_some is None: 

1269 self.read_some = read_all 

1270 else: 

1271 self.read_some = read_some 

1272 self.sha = sha1() 

1273 self._offset = 0 

1274 self._rbuf = BytesIO() 

1275 # trailer is a deque to avoid memory allocation on small reads 

1276 self._trailer: deque[int] = deque() 

1277 self._zlib_bufsize = zlib_bufsize 

1278 

1279 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1280 """Read up to size bytes using the given callback. 

1281 

1282 As a side effect, update the verifier's hash (excluding the last 20 

1283 bytes read). 

1284 

1285 Args: 

1286 read: The read callback to read from. 

1287 size: The maximum number of bytes to read; the particular 

1288 behavior is callback-specific. 

1289 Returns: Bytes read 

1290 """ 

1291 data = read(size) 

1292 

1293 # maintain a trailer of the last 20 bytes we've read 

1294 n = len(data) 

1295 self._offset += n 

1296 tn = len(self._trailer) 

1297 if n >= 20: 

1298 to_pop = tn 

1299 to_add = 20 

1300 else: 

1301 to_pop = max(n + tn - 20, 0) 

1302 to_add = n 

1303 self.sha.update( 

1304 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1305 ) 

1306 self._trailer.extend(data[-to_add:]) 

1307 

1308 # hash everything but the trailer 

1309 self.sha.update(data[:-to_add]) 

1310 return data 

1311 

1312 def _buf_len(self) -> int: 

1313 buf = self._rbuf 

1314 start = buf.tell() 

1315 buf.seek(0, SEEK_END) 

1316 end = buf.tell() 

1317 buf.seek(start) 

1318 return end - start 

1319 

1320 @property 

1321 def offset(self) -> int: 

1322 """Return current offset in the stream.""" 

1323 return self._offset - self._buf_len() 

1324 

1325 def read(self, size: int) -> bytes: 

1326 """Read, blocking until size bytes are read.""" 

1327 buf_len = self._buf_len() 

1328 if buf_len >= size: 

1329 return self._rbuf.read(size) 

1330 buf_data = self._rbuf.read() 

1331 self._rbuf = BytesIO() 

1332 return buf_data + self._read(self.read_all, size - buf_len) 

1333 

1334 def recv(self, size: int) -> bytes: 

1335 """Read up to size bytes, blocking until one byte is read.""" 

1336 buf_len = self._buf_len() 

1337 if buf_len: 

1338 data = self._rbuf.read(size) 

1339 if size >= buf_len: 

1340 self._rbuf = BytesIO() 

1341 return data 

1342 return self._read(self.read_some, size) 

1343 

1344 def __len__(self) -> int: 

1345 """Return the number of objects in this pack.""" 

1346 return self._num_objects 

1347 

1348 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1349 """Read the objects in this pack file. 

1350 

1351 Args: 

1352 compute_crc32: If True, compute the CRC32 of the compressed 

1353 data. If False, the returned CRC32 will be None. 

1354 Returns: Iterator over UnpackedObjects with the following members set: 

1355 offset 

1356 obj_type_num 

1357 obj_chunks (for non-delta types) 

1358 delta_base (for delta types) 

1359 decomp_chunks 

1360 decomp_len 

1361 crc32 (if compute_crc32 is True) 

1362 

1363 Raises: 

1364 ChecksumMismatch: if the checksum of the pack contents does not 

1365 match the checksum in the pack trailer. 

1366 zlib.error: if an error occurred during zlib decompression. 

1367 IOError: if an error occurred writing to the output file. 

1368 """ 

1369 _pack_version, self._num_objects = read_pack_header(self.read) 

1370 

1371 for _ in range(self._num_objects): 

1372 offset = self.offset 

1373 unpacked, unused = unpack_object( 

1374 self.read, 

1375 read_some=self.recv, 

1376 compute_crc32=compute_crc32, 

1377 zlib_bufsize=self._zlib_bufsize, 

1378 ) 

1379 unpacked.offset = offset 

1380 

1381 # prepend any unused data to current read buffer 

1382 buf = BytesIO() 

1383 buf.write(unused) 

1384 buf.write(self._rbuf.read()) 

1385 buf.seek(0) 

1386 self._rbuf = buf 

1387 

1388 yield unpacked 

1389 

1390 if self._buf_len() < 20: 

1391 # If the read buffer is full, then the last read() got the whole 

1392 # trailer off the wire. If not, it means there is still some of the 

1393 # trailer to read. We need to read() all 20 bytes; N come from the 

1394 # read buffer and (20 - N) come from the wire. 

1395 self.read(20) 

1396 

1397 pack_sha = bytearray(self._trailer) 

1398 if pack_sha != self.sha.digest(): 

1399 raise ChecksumMismatch(sha_to_hex(bytes(pack_sha)), self.sha.hexdigest()) 

1400 

1401 

1402class PackStreamCopier(PackStreamReader): 

1403 """Class to verify a pack stream as it is being read. 

1404 

1405 The pack is read from a ReceivableProtocol using read() or recv() as 

1406 appropriate and written out to the given file-like object. 

1407 """ 

1408 

1409 def __init__( 

1410 self, 

1411 read_all: Callable[[int], bytes], 

1412 read_some: Optional[Callable[[int], bytes]], 

1413 outfile: IO[bytes], 

1414 delta_iter: Optional["DeltaChainIterator[UnpackedObject]"] = None, 

1415 ) -> None: 

1416 """Initialize the copier. 

1417 

1418 Args: 

1419 read_all: Read function that blocks until the number of 

1420 requested bytes are read. 

1421 read_some: Read function that returns at least one byte, but may 

1422 not return the number of bytes requested. 

1423 outfile: File-like object to write output through. 

1424 delta_iter: Optional DeltaChainIterator to record deltas as we 

1425 read them. 

1426 """ 

1427 super().__init__(read_all, read_some=read_some) 

1428 self.outfile = outfile 

1429 self._delta_iter = delta_iter 

1430 

1431 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1432 """Read data from the read callback and write it to the file.""" 

1433 data = super()._read(read, size) 

1434 self.outfile.write(data) 

1435 return data 

1436 

1437 def verify(self, progress: Optional[Callable[..., None]] = None) -> None: 

1438 """Verify a pack stream and write it to the output file. 

1439 

1440 See PackStreamReader.iterobjects for a list of exceptions this may 

1441 throw. 

1442 """ 

1443 i = 0 # default count of entries if read_objects() is empty 

1444 for i, unpacked in enumerate(self.read_objects()): 

1445 if self._delta_iter: 

1446 self._delta_iter.record(unpacked) 

1447 if progress is not None: 

1448 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1449 if progress is not None: 

1450 progress(f"copied {i} pack entries\n".encode("ascii")) 

1451 

1452 

1453def obj_sha(type: int, chunks: Union[bytes, Iterable[bytes]]) -> bytes: 

1454 """Compute the SHA for a numeric type and object chunks.""" 

1455 sha = sha1() 

1456 sha.update(object_header(type, chunks_length(chunks))) 

1457 if isinstance(chunks, bytes): 

1458 sha.update(chunks) 

1459 else: 

1460 for chunk in chunks: 

1461 sha.update(chunk) 

1462 return sha.digest() 

1463 

1464 

1465def compute_file_sha( 

1466 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16 

1467) -> "HashObject": 

1468 """Hash a portion of a file into a new SHA. 

1469 

1470 Args: 

1471 f: A file-like object to read from that supports seek(). 

1472 start_ofs: The offset in the file to start reading at. 

1473 end_ofs: The offset in the file to end reading at, relative to the 

1474 end of the file. 

1475 buffer_size: A buffer size for reading. 

1476 Returns: A new SHA object updated with data read from the file. 

1477 """ 

1478 sha = sha1() 

1479 f.seek(0, SEEK_END) 

1480 length = f.tell() 

1481 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1482 raise AssertionError( 

1483 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1484 ) 

1485 todo = length + end_ofs - start_ofs 

1486 f.seek(start_ofs) 

1487 while todo: 

1488 data = f.read(min(todo, buffer_size)) 

1489 sha.update(data) 

1490 todo -= len(data) 

1491 return sha 

1492 

1493 

1494class PackData: 

1495 """The data contained in a packfile. 

1496 

1497 Pack files can be accessed both sequentially for exploding a pack, and 

1498 directly with the help of an index to retrieve a specific object. 

1499 

1500 The objects within are either complete or a delta against another. 

1501 

1502 The header is variable length. If the MSB of each byte is set then it 

1503 indicates that the subsequent byte is still part of the header. 

1504 For the first byte the next MS bits are the type, which tells you the type 

1505 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1506 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1507 size, i.e. the last byte of the header contains the MS bits of the size. 

1508 

1509 For the complete objects the data is stored as zlib deflated data. 

1510 The size in the header is the uncompressed object size, so to uncompress 

1511 you need to just keep feeding data to zlib until you get an object back, 

1512 or it errors on bad data. This is done here by just giving the complete 

1513 buffer from the start of the deflated object on. This is bad, but until I 

1514 get mmap sorted out it will have to do. 

1515 

1516 Currently there are no integrity checks done. Also no attempt is made to 

1517 try and detect the delta case, or a request for an object at the wrong 

1518 position. It will all just throw a zlib or KeyError. 

1519 """ 

1520 

1521 def __init__( 

1522 self, 

1523 filename: Union[str, os.PathLike[str]], 

1524 file: Optional[IO[bytes]] = None, 

1525 size: Optional[int] = None, 

1526 *, 

1527 delta_window_size: Optional[int] = None, 

1528 window_memory: Optional[int] = None, 

1529 delta_cache_size: Optional[int] = None, 

1530 depth: Optional[int] = None, 

1531 threads: Optional[int] = None, 

1532 big_file_threshold: Optional[int] = None, 

1533 ) -> None: 

1534 """Create a PackData object representing the pack in the given filename. 

1535 

1536 The file must exist and stay readable until the object is disposed of. 

1537 It must also stay the same size. It will be mapped whenever needed. 

1538 

1539 Currently there is a restriction on the size of the pack as the python 

1540 mmap implementation is flawed. 

1541 """ 

1542 self._filename = filename 

1543 self._size = size 

1544 self._header_size = 12 

1545 self.delta_window_size = delta_window_size 

1546 self.window_memory = window_memory 

1547 self.delta_cache_size = delta_cache_size 

1548 self.depth = depth 

1549 self.threads = threads 

1550 self.big_file_threshold = big_file_threshold 

1551 self._file: IO[bytes] 

1552 

1553 if file is None: 

1554 self._file = GitFile(self._filename, "rb") 

1555 else: 

1556 self._file = file 

1557 (_version, self._num_objects) = read_pack_header(self._file.read) 

1558 

1559 # Use delta_cache_size config if available, otherwise default 

1560 cache_size = delta_cache_size or (1024 * 1024 * 20) 

1561 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1562 cache_size, compute_size=_compute_object_size 

1563 ) 

1564 

1565 @property 

1566 def filename(self) -> str: 

1567 """Get the filename of the pack file. 

1568 

1569 Returns: 

1570 Base filename without directory path 

1571 """ 

1572 return os.path.basename(self._filename) 

1573 

1574 @property 

1575 def path(self) -> Union[str, os.PathLike[str]]: 

1576 """Get the full path of the pack file. 

1577 

1578 Returns: 

1579 Full path to the pack file 

1580 """ 

1581 return self._filename 

1582 

1583 @classmethod 

1584 def from_file(cls, file: IO[bytes], size: Optional[int] = None) -> "PackData": 

1585 """Create a PackData object from an open file. 

1586 

1587 Args: 

1588 file: Open file object 

1589 size: Optional file size 

1590 

1591 Returns: 

1592 PackData instance 

1593 """ 

1594 return cls(str(file), file=file, size=size) 

1595 

1596 @classmethod 

1597 def from_path(cls, path: Union[str, os.PathLike[str]]) -> "PackData": 

1598 """Create a PackData object from a file path. 

1599 

1600 Args: 

1601 path: Path to the pack file 

1602 

1603 Returns: 

1604 PackData instance 

1605 """ 

1606 return cls(filename=path) 

1607 

1608 def close(self) -> None: 

1609 """Close the underlying pack file.""" 

1610 self._file.close() 

1611 

1612 def __enter__(self) -> "PackData": 

1613 """Enter context manager.""" 

1614 return self 

1615 

1616 def __exit__( 

1617 self, 

1618 exc_type: Optional[type], 

1619 exc_val: Optional[BaseException], 

1620 exc_tb: Optional[TracebackType], 

1621 ) -> None: 

1622 """Exit context manager.""" 

1623 self.close() 

1624 

1625 def __eq__(self, other: object) -> bool: 

1626 """Check equality with another object.""" 

1627 if isinstance(other, PackData): 

1628 return self.get_stored_checksum() == other.get_stored_checksum() 

1629 return False 

1630 

1631 def _get_size(self) -> int: 

1632 if self._size is not None: 

1633 return self._size 

1634 self._size = os.path.getsize(self._filename) 

1635 if self._size < self._header_size: 

1636 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1637 raise AssertionError(errmsg) 

1638 return self._size 

1639 

1640 def __len__(self) -> int: 

1641 """Returns the number of objects in this pack.""" 

1642 return self._num_objects 

1643 

1644 def calculate_checksum(self) -> bytes: 

1645 """Calculate the checksum for this pack. 

1646 

1647 Returns: 20-byte binary SHA1 digest 

1648 """ 

1649 return compute_file_sha(self._file, end_ofs=-20).digest() 

1650 

1651 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1652 """Iterate over unpacked objects in the pack.""" 

1653 self._file.seek(self._header_size) 

1654 

1655 if self._num_objects is None: 

1656 return 

1657 

1658 for _ in range(self._num_objects): 

1659 offset = self._file.tell() 

1660 unpacked, unused = unpack_object( 

1661 self._file.read, compute_crc32=False, include_comp=include_comp 

1662 ) 

1663 unpacked.offset = offset 

1664 yield unpacked 

1665 # Back up over unused data. 

1666 self._file.seek(-len(unused), SEEK_CUR) 

1667 

1668 def iterentries( 

1669 self, 

1670 progress: Optional[Callable[[int, int], None]] = None, 

1671 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1672 ) -> Iterator[tuple[bytes, int, Optional[int]]]: 

1673 """Yield entries summarizing the contents of this pack. 

1674 

1675 Args: 

1676 progress: Progress function, called with current and total 

1677 object count. 

1678 resolve_ext_ref: Optional function to resolve external references 

1679 Returns: iterator of tuples with (sha, offset, crc32) 

1680 """ 

1681 num_objects = self._num_objects 

1682 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1683 for i, result in enumerate(indexer): 

1684 if progress is not None: 

1685 progress(i, num_objects) 

1686 yield result 

1687 

1688 def sorted_entries( 

1689 self, 

1690 progress: Optional[ProgressFn] = None, 

1691 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1692 ) -> list[tuple[bytes, int, int]]: 

1693 """Return entries in this pack, sorted by SHA. 

1694 

1695 Args: 

1696 progress: Progress function, called with current and total 

1697 object count 

1698 resolve_ext_ref: Optional function to resolve external references 

1699 Returns: Iterator of tuples with (sha, offset, crc32) 

1700 """ 

1701 return sorted( 

1702 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore 

1703 ) 

1704 

1705 def create_index_v1( 

1706 self, 

1707 filename: str, 

1708 progress: Optional[Callable[..., None]] = None, 

1709 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1710 ) -> bytes: 

1711 """Create a version 1 file for this data file. 

1712 

1713 Args: 

1714 filename: Index filename. 

1715 progress: Progress report function 

1716 resolve_ext_ref: Optional function to resolve external references 

1717 Returns: Checksum of index file 

1718 """ 

1719 entries = self.sorted_entries( 

1720 progress=progress, resolve_ext_ref=resolve_ext_ref 

1721 ) 

1722 checksum = self.calculate_checksum() 

1723 with GitFile(filename, "wb") as f: 

1724 write_pack_index_v1( 

1725 f, 

1726 entries, 

1727 checksum, 

1728 ) 

1729 return checksum 

1730 

1731 def create_index_v2( 

1732 self, 

1733 filename: str, 

1734 progress: Optional[Callable[..., None]] = None, 

1735 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1736 ) -> bytes: 

1737 """Create a version 2 index file for this data file. 

1738 

1739 Args: 

1740 filename: Index filename. 

1741 progress: Progress report function 

1742 resolve_ext_ref: Optional function to resolve external references 

1743 Returns: Checksum of index file 

1744 """ 

1745 entries = self.sorted_entries( 

1746 progress=progress, resolve_ext_ref=resolve_ext_ref 

1747 ) 

1748 with GitFile(filename, "wb") as f: 

1749 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

1750 

1751 def create_index_v3( 

1752 self, 

1753 filename: str, 

1754 progress: Optional[Callable[..., None]] = None, 

1755 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1756 hash_algorithm: int = 1, 

1757 ) -> bytes: 

1758 """Create a version 3 index file for this data file. 

1759 

1760 Args: 

1761 filename: Index filename. 

1762 progress: Progress report function 

1763 resolve_ext_ref: Function to resolve external references 

1764 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1765 Returns: Checksum of index file 

1766 """ 

1767 entries = self.sorted_entries( 

1768 progress=progress, resolve_ext_ref=resolve_ext_ref 

1769 ) 

1770 with GitFile(filename, "wb") as f: 

1771 return write_pack_index_v3( 

1772 f, entries, self.calculate_checksum(), hash_algorithm 

1773 ) 

1774 

1775 def create_index( 

1776 self, 

1777 filename: str, 

1778 progress: Optional[Callable[..., None]] = None, 

1779 version: int = 2, 

1780 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1781 hash_algorithm: int = 1, 

1782 ) -> bytes: 

1783 """Create an index file for this data file. 

1784 

1785 Args: 

1786 filename: Index filename. 

1787 progress: Progress report function 

1788 version: Index version (1, 2, or 3) 

1789 resolve_ext_ref: Function to resolve external references 

1790 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

1791 Returns: Checksum of index file 

1792 """ 

1793 if version == 1: 

1794 return self.create_index_v1( 

1795 filename, progress, resolve_ext_ref=resolve_ext_ref 

1796 ) 

1797 elif version == 2: 

1798 return self.create_index_v2( 

1799 filename, progress, resolve_ext_ref=resolve_ext_ref 

1800 ) 

1801 elif version == 3: 

1802 return self.create_index_v3( 

1803 filename, 

1804 progress, 

1805 resolve_ext_ref=resolve_ext_ref, 

1806 hash_algorithm=hash_algorithm, 

1807 ) 

1808 else: 

1809 raise ValueError(f"unknown index format {version}") 

1810 

1811 def get_stored_checksum(self) -> bytes: 

1812 """Return the expected checksum stored in this pack.""" 

1813 self._file.seek(-20, SEEK_END) 

1814 return self._file.read(20) 

1815 

1816 def check(self) -> None: 

1817 """Check the consistency of this pack.""" 

1818 actual = self.calculate_checksum() 

1819 stored = self.get_stored_checksum() 

1820 if actual != stored: 

1821 raise ChecksumMismatch(stored, actual) 

1822 

1823 def get_unpacked_object_at( 

1824 self, offset: int, *, include_comp: bool = False 

1825 ) -> UnpackedObject: 

1826 """Given offset in the packfile return a UnpackedObject.""" 

1827 assert offset >= self._header_size 

1828 self._file.seek(offset) 

1829 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp) 

1830 unpacked.offset = offset 

1831 return unpacked 

1832 

1833 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

1834 """Given an offset in to the packfile return the object that is there. 

1835 

1836 Using the associated index the location of an object can be looked up, 

1837 and then the packfile can be asked directly for that object using this 

1838 function. 

1839 """ 

1840 try: 

1841 return self._offset_cache[offset] 

1842 except KeyError: 

1843 pass 

1844 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

1845 return (unpacked.pack_type_num, unpacked._obj()) 

1846 

1847 

1848T = TypeVar("T") 

1849 

1850 

1851class DeltaChainIterator(Generic[T]): 

1852 """Abstract iterator over pack data based on delta chains. 

1853 

1854 Each object in the pack is guaranteed to be inflated exactly once, 

1855 regardless of how many objects reference it as a delta base. As a result, 

1856 memory usage is proportional to the length of the longest delta chain. 

1857 

1858 Subclasses can override _result to define the result type of the iterator. 

1859 By default, results are UnpackedObjects with the following members set: 

1860 

1861 * offset 

1862 * obj_type_num 

1863 * obj_chunks 

1864 * pack_type_num 

1865 * delta_base (for delta types) 

1866 * comp_chunks (if _include_comp is True) 

1867 * decomp_chunks 

1868 * decomp_len 

1869 * crc32 (if _compute_crc32 is True) 

1870 """ 

1871 

1872 _compute_crc32 = False 

1873 _include_comp = False 

1874 

1875 def __init__( 

1876 self, 

1877 file_obj: Optional[IO[bytes]], 

1878 *, 

1879 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1880 ) -> None: 

1881 """Initialize DeltaChainIterator. 

1882 

1883 Args: 

1884 file_obj: File object to read pack data from 

1885 resolve_ext_ref: Optional function to resolve external references 

1886 """ 

1887 self._file = file_obj 

1888 self._resolve_ext_ref = resolve_ext_ref 

1889 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

1890 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

1891 self._full_ofs: list[tuple[int, int]] = [] 

1892 self._ext_refs: list[bytes] = [] 

1893 

1894 @classmethod 

1895 def for_pack_data( 

1896 cls, pack_data: PackData, resolve_ext_ref: Optional[ResolveExtRefFn] = None 

1897 ) -> "DeltaChainIterator[T]": 

1898 """Create a DeltaChainIterator from pack data. 

1899 

1900 Args: 

1901 pack_data: PackData object to iterate 

1902 resolve_ext_ref: Optional function to resolve external refs 

1903 

1904 Returns: 

1905 DeltaChainIterator instance 

1906 """ 

1907 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1908 walker.set_pack_data(pack_data) 

1909 for unpacked in pack_data.iter_unpacked(include_comp=False): 

1910 walker.record(unpacked) 

1911 return walker 

1912 

1913 @classmethod 

1914 def for_pack_subset( 

1915 cls, 

1916 pack: "Pack", 

1917 shas: Iterable[bytes], 

1918 *, 

1919 allow_missing: bool = False, 

1920 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1921 ) -> "DeltaChainIterator[T]": 

1922 """Create a DeltaChainIterator for a subset of objects. 

1923 

1924 Args: 

1925 pack: Pack object containing the data 

1926 shas: Iterable of object SHAs to include 

1927 allow_missing: If True, skip missing objects 

1928 resolve_ext_ref: Optional function to resolve external refs 

1929 

1930 Returns: 

1931 DeltaChainIterator instance 

1932 """ 

1933 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1934 walker.set_pack_data(pack.data) 

1935 todo = set() 

1936 for sha in shas: 

1937 assert isinstance(sha, bytes) 

1938 try: 

1939 off = pack.index.object_offset(sha) 

1940 except KeyError: 

1941 if not allow_missing: 

1942 raise 

1943 else: 

1944 todo.add(off) 

1945 done = set() 

1946 while todo: 

1947 off = todo.pop() 

1948 unpacked = pack.data.get_unpacked_object_at(off) 

1949 walker.record(unpacked) 

1950 done.add(off) 

1951 base_ofs = None 

1952 if unpacked.pack_type_num == OFS_DELTA: 

1953 assert unpacked.offset is not None 

1954 assert unpacked.delta_base is not None 

1955 assert isinstance(unpacked.delta_base, int) 

1956 base_ofs = unpacked.offset - unpacked.delta_base 

1957 elif unpacked.pack_type_num == REF_DELTA: 

1958 with suppress(KeyError): 

1959 assert isinstance(unpacked.delta_base, bytes) 

1960 base_ofs = pack.index.object_index(unpacked.delta_base) 

1961 if base_ofs is not None and base_ofs not in done: 

1962 todo.add(base_ofs) 

1963 return walker 

1964 

1965 def record(self, unpacked: UnpackedObject) -> None: 

1966 """Record an unpacked object for later processing. 

1967 

1968 Args: 

1969 unpacked: UnpackedObject to record 

1970 """ 

1971 type_num = unpacked.pack_type_num 

1972 offset = unpacked.offset 

1973 assert offset is not None 

1974 if type_num == OFS_DELTA: 

1975 assert unpacked.delta_base is not None 

1976 assert isinstance(unpacked.delta_base, int) 

1977 base_offset = offset - unpacked.delta_base 

1978 self._pending_ofs[base_offset].append(offset) 

1979 elif type_num == REF_DELTA: 

1980 assert isinstance(unpacked.delta_base, bytes) 

1981 self._pending_ref[unpacked.delta_base].append(offset) 

1982 else: 

1983 self._full_ofs.append((offset, type_num)) 

1984 

1985 def set_pack_data(self, pack_data: PackData) -> None: 

1986 """Set the pack data for iteration. 

1987 

1988 Args: 

1989 pack_data: PackData object to use 

1990 """ 

1991 self._file = pack_data._file 

1992 

1993 def _walk_all_chains(self) -> Iterator[T]: 

1994 for offset, type_num in self._full_ofs: 

1995 yield from self._follow_chain(offset, type_num, None) 

1996 yield from self._walk_ref_chains() 

1997 assert not self._pending_ofs, repr(self._pending_ofs) 

1998 

1999 def _ensure_no_pending(self) -> None: 

2000 if self._pending_ref: 

2001 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref]) 

2002 

2003 def _walk_ref_chains(self) -> Iterator[T]: 

2004 if not self._resolve_ext_ref: 

2005 self._ensure_no_pending() 

2006 return 

2007 

2008 for base_sha, pending in sorted(self._pending_ref.items()): 

2009 if base_sha not in self._pending_ref: 

2010 continue 

2011 try: 

2012 type_num, chunks = self._resolve_ext_ref(base_sha) 

2013 except KeyError: 

2014 # Not an external ref, but may depend on one. Either it will 

2015 # get popped via a _follow_chain call, or we will raise an 

2016 # error below. 

2017 continue 

2018 self._ext_refs.append(base_sha) 

2019 self._pending_ref.pop(base_sha) 

2020 for new_offset in pending: 

2021 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type] 

2022 

2023 self._ensure_no_pending() 

2024 

2025 def _result(self, unpacked: UnpackedObject) -> T: 

2026 raise NotImplementedError 

2027 

2028 def _resolve_object( 

2029 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2030 ) -> UnpackedObject: 

2031 assert self._file is not None 

2032 self._file.seek(offset) 

2033 unpacked, _ = unpack_object( 

2034 self._file.read, 

2035 include_comp=self._include_comp, 

2036 compute_crc32=self._compute_crc32, 

2037 ) 

2038 unpacked.offset = offset 

2039 if base_chunks is None: 

2040 assert unpacked.pack_type_num == obj_type_num 

2041 else: 

2042 assert unpacked.pack_type_num in DELTA_TYPES 

2043 unpacked.obj_type_num = obj_type_num 

2044 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2045 return unpacked 

2046 

2047 def _follow_chain( 

2048 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2049 ) -> Iterator[T]: 

2050 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2051 # this approach by design inflates each object exactly once. 

2052 todo = [(offset, obj_type_num, base_chunks)] 

2053 while todo: 

2054 (offset, obj_type_num, base_chunks) = todo.pop() 

2055 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2056 yield self._result(unpacked) 

2057 

2058 assert unpacked.offset is not None 

2059 unblocked = chain( 

2060 self._pending_ofs.pop(unpacked.offset, []), 

2061 self._pending_ref.pop(unpacked.sha(), []), 

2062 ) 

2063 todo.extend( 

2064 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2065 for new_offset in unblocked 

2066 ) 

2067 

2068 def __iter__(self) -> Iterator[T]: 

2069 """Iterate over objects in the pack.""" 

2070 return self._walk_all_chains() 

2071 

2072 def ext_refs(self) -> list[bytes]: 

2073 """Return external references.""" 

2074 return self._ext_refs 

2075 

2076 

2077class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2078 """Delta chain iterator that yield unpacked objects.""" 

2079 

2080 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2081 """Return the unpacked object. 

2082 

2083 Args: 

2084 unpacked: The unpacked object 

2085 

2086 Returns: 

2087 The unpacked object unchanged 

2088 """ 

2089 return unpacked 

2090 

2091 

2092class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2093 """Delta chain iterator that yields index entries.""" 

2094 

2095 _compute_crc32 = True 

2096 

2097 def _result(self, unpacked: UnpackedObject) -> tuple[bytes, int, Optional[int]]: 

2098 """Convert unpacked object to pack index entry. 

2099 

2100 Args: 

2101 unpacked: The unpacked object 

2102 

2103 Returns: 

2104 Tuple of (sha, offset, crc32) for index entry 

2105 """ 

2106 assert unpacked.offset is not None 

2107 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2108 

2109 

2110class PackInflater(DeltaChainIterator[ShaFile]): 

2111 """Delta chain iterator that yields ShaFile objects.""" 

2112 

2113 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2114 """Convert unpacked object to ShaFile. 

2115 

2116 Args: 

2117 unpacked: The unpacked object 

2118 

2119 Returns: 

2120 ShaFile object from the unpacked data 

2121 """ 

2122 return unpacked.sha_file() 

2123 

2124 

2125class SHA1Reader(BinaryIO): 

2126 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2127 

2128 def __init__(self, f: IO[bytes]) -> None: 

2129 """Initialize SHA1Reader. 

2130 

2131 Args: 

2132 f: File-like object to wrap 

2133 """ 

2134 self.f = f 

2135 self.sha1 = sha1(b"") 

2136 

2137 def read(self, size: int = -1) -> bytes: 

2138 """Read bytes and update SHA1. 

2139 

2140 Args: 

2141 size: Number of bytes to read, -1 for all 

2142 

2143 Returns: 

2144 Bytes read from file 

2145 """ 

2146 data = self.f.read(size) 

2147 self.sha1.update(data) 

2148 return data 

2149 

2150 def check_sha(self, allow_empty: bool = False) -> None: 

2151 """Check if the SHA1 matches the expected value. 

2152 

2153 Args: 

2154 allow_empty: Allow empty SHA1 hash 

2155 

2156 Raises: 

2157 ChecksumMismatch: If SHA1 doesn't match 

2158 """ 

2159 stored = self.f.read(20) 

2160 # If git option index.skipHash is set the index will be empty 

2161 if stored != self.sha1.digest() and ( 

2162 not allow_empty 

2163 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000" 

2164 ): 

2165 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored)) 

2166 

2167 def close(self) -> None: 

2168 """Close the underlying file.""" 

2169 return self.f.close() 

2170 

2171 def tell(self) -> int: 

2172 """Return current file position.""" 

2173 return self.f.tell() 

2174 

2175 # BinaryIO abstract methods 

2176 def readable(self) -> bool: 

2177 """Check if file is readable.""" 

2178 return True 

2179 

2180 def writable(self) -> bool: 

2181 """Check if file is writable.""" 

2182 return False 

2183 

2184 def seekable(self) -> bool: 

2185 """Check if file is seekable.""" 

2186 return getattr(self.f, "seekable", lambda: False)() 

2187 

2188 def seek(self, offset: int, whence: int = 0) -> int: 

2189 """Seek to position in file. 

2190 

2191 Args: 

2192 offset: Position offset 

2193 whence: Reference point (0=start, 1=current, 2=end) 

2194 

2195 Returns: 

2196 New file position 

2197 """ 

2198 return self.f.seek(offset, whence) 

2199 

2200 def flush(self) -> None: 

2201 """Flush the file buffer.""" 

2202 if hasattr(self.f, "flush"): 

2203 self.f.flush() 

2204 

2205 def readline(self, size: int = -1) -> bytes: 

2206 """Read a line from the file. 

2207 

2208 Args: 

2209 size: Maximum bytes to read 

2210 

2211 Returns: 

2212 Line read from file 

2213 """ 

2214 return self.f.readline(size) 

2215 

2216 def readlines(self, hint: int = -1) -> list[bytes]: 

2217 """Read all lines from the file. 

2218 

2219 Args: 

2220 hint: Approximate number of bytes to read 

2221 

2222 Returns: 

2223 List of lines 

2224 """ 

2225 return self.f.readlines(hint) 

2226 

2227 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2228 """Write multiple lines to the file (not supported).""" 

2229 raise UnsupportedOperation("writelines") 

2230 

2231 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2232 """Write data to the file (not supported).""" 

2233 raise UnsupportedOperation("write") 

2234 

2235 def __enter__(self) -> "SHA1Reader": 

2236 """Enter context manager.""" 

2237 return self 

2238 

2239 def __exit__( 

2240 self, 

2241 type: Optional[type], 

2242 value: Optional[BaseException], 

2243 traceback: Optional[TracebackType], 

2244 ) -> None: 

2245 """Exit context manager and close file.""" 

2246 self.close() 

2247 

2248 def __iter__(self) -> "SHA1Reader": 

2249 """Return iterator for reading file lines.""" 

2250 return self 

2251 

2252 def __next__(self) -> bytes: 

2253 """Get next line from file. 

2254 

2255 Returns: 

2256 Next line 

2257 

2258 Raises: 

2259 StopIteration: When no more lines 

2260 """ 

2261 line = self.readline() 

2262 if not line: 

2263 raise StopIteration 

2264 return line 

2265 

2266 def fileno(self) -> int: 

2267 """Return file descriptor number.""" 

2268 return self.f.fileno() 

2269 

2270 def isatty(self) -> bool: 

2271 """Check if file is a terminal.""" 

2272 return getattr(self.f, "isatty", lambda: False)() 

2273 

2274 def truncate(self, size: Optional[int] = None) -> int: 

2275 """Not supported for read-only file. 

2276 

2277 Raises: 

2278 UnsupportedOperation: Always raised 

2279 """ 

2280 raise UnsupportedOperation("truncate") 

2281 

2282 

2283class SHA1Writer(BinaryIO): 

2284 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2285 

2286 def __init__(self, f: Union[BinaryIO, IO[bytes]]) -> None: 

2287 """Initialize SHA1Writer. 

2288 

2289 Args: 

2290 f: File-like object to wrap 

2291 """ 

2292 self.f = f 

2293 self.length = 0 

2294 self.sha1 = sha1(b"") 

2295 self.digest: Optional[bytes] = None 

2296 

2297 def write(self, data: Union[bytes, bytearray, memoryview], /) -> int: # type: ignore[override] 

2298 """Write data and update SHA1. 

2299 

2300 Args: 

2301 data: Data to write 

2302 

2303 Returns: 

2304 Number of bytes written 

2305 """ 

2306 self.sha1.update(data) 

2307 written = self.f.write(data) 

2308 self.length += written 

2309 return written 

2310 

2311 def write_sha(self) -> bytes: 

2312 """Write the SHA1 digest to the file. 

2313 

2314 Returns: 

2315 The SHA1 digest bytes 

2316 """ 

2317 sha = self.sha1.digest() 

2318 assert len(sha) == 20 

2319 self.f.write(sha) 

2320 self.length += len(sha) 

2321 return sha 

2322 

2323 def close(self) -> None: 

2324 """Close the pack file and finalize the SHA.""" 

2325 self.digest = self.write_sha() 

2326 self.f.close() 

2327 

2328 def offset(self) -> int: 

2329 """Get the total number of bytes written. 

2330 

2331 Returns: 

2332 Total bytes written 

2333 """ 

2334 return self.length 

2335 

2336 def tell(self) -> int: 

2337 """Return current file position.""" 

2338 return self.f.tell() 

2339 

2340 # BinaryIO abstract methods 

2341 def readable(self) -> bool: 

2342 """Check if file is readable.""" 

2343 return False 

2344 

2345 def writable(self) -> bool: 

2346 """Check if file is writable.""" 

2347 return True 

2348 

2349 def seekable(self) -> bool: 

2350 """Check if file is seekable.""" 

2351 return getattr(self.f, "seekable", lambda: False)() 

2352 

2353 def seek(self, offset: int, whence: int = 0) -> int: 

2354 """Seek to position in file. 

2355 

2356 Args: 

2357 offset: Position offset 

2358 whence: Reference point (0=start, 1=current, 2=end) 

2359 

2360 Returns: 

2361 New file position 

2362 """ 

2363 return self.f.seek(offset, whence) 

2364 

2365 def flush(self) -> None: 

2366 """Flush the file buffer.""" 

2367 if hasattr(self.f, "flush"): 

2368 self.f.flush() 

2369 

2370 def readline(self, size: int = -1) -> bytes: 

2371 """Not supported for write-only file. 

2372 

2373 Raises: 

2374 UnsupportedOperation: Always raised 

2375 """ 

2376 raise UnsupportedOperation("readline") 

2377 

2378 def readlines(self, hint: int = -1) -> list[bytes]: 

2379 """Not supported for write-only file. 

2380 

2381 Raises: 

2382 UnsupportedOperation: Always raised 

2383 """ 

2384 raise UnsupportedOperation("readlines") 

2385 

2386 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2387 """Write multiple lines to the file. 

2388 

2389 Args: 

2390 lines: Iterable of lines to write 

2391 """ 

2392 for line in lines: 

2393 self.write(line) 

2394 

2395 def read(self, size: int = -1) -> bytes: 

2396 """Not supported for write-only file. 

2397 

2398 Raises: 

2399 UnsupportedOperation: Always raised 

2400 """ 

2401 raise UnsupportedOperation("read") 

2402 

2403 def __enter__(self) -> "SHA1Writer": 

2404 """Enter context manager.""" 

2405 return self 

2406 

2407 def __exit__( 

2408 self, 

2409 type: Optional[type], 

2410 value: Optional[BaseException], 

2411 traceback: Optional[TracebackType], 

2412 ) -> None: 

2413 """Exit context manager and close file.""" 

2414 self.close() 

2415 

2416 def __iter__(self) -> "SHA1Writer": 

2417 """Return iterator.""" 

2418 return self 

2419 

2420 def __next__(self) -> bytes: 

2421 """Not supported for write-only file. 

2422 

2423 Raises: 

2424 UnsupportedOperation: Always raised 

2425 """ 

2426 raise UnsupportedOperation("__next__") 

2427 

2428 def fileno(self) -> int: 

2429 """Return file descriptor number.""" 

2430 return self.f.fileno() 

2431 

2432 def isatty(self) -> bool: 

2433 """Check if file is a terminal.""" 

2434 return getattr(self.f, "isatty", lambda: False)() 

2435 

2436 def truncate(self, size: Optional[int] = None) -> int: 

2437 """Not supported for write-only file. 

2438 

2439 Raises: 

2440 UnsupportedOperation: Always raised 

2441 """ 

2442 raise UnsupportedOperation("truncate") 

2443 

2444 

2445def pack_object_header( 

2446 type_num: int, delta_base: Optional[Union[bytes, int]], size: int 

2447) -> bytearray: 

2448 """Create a pack object header for the given object info. 

2449 

2450 Args: 

2451 type_num: Numeric type of the object. 

2452 delta_base: Delta base offset or ref, or None for whole objects. 

2453 size: Uncompressed object size. 

2454 Returns: A header for a packed object. 

2455 """ 

2456 header = [] 

2457 c = (type_num << 4) | (size & 15) 

2458 size >>= 4 

2459 while size: 

2460 header.append(c | 0x80) 

2461 c = size & 0x7F 

2462 size >>= 7 

2463 header.append(c) 

2464 if type_num == OFS_DELTA: 

2465 assert isinstance(delta_base, int) 

2466 ret = [delta_base & 0x7F] 

2467 delta_base >>= 7 

2468 while delta_base: 

2469 delta_base -= 1 

2470 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2471 delta_base >>= 7 

2472 header.extend(ret) 

2473 elif type_num == REF_DELTA: 

2474 assert isinstance(delta_base, bytes) 

2475 assert len(delta_base) == 20 

2476 header += delta_base 

2477 return bytearray(header) 

2478 

2479 

2480def pack_object_chunks( 

2481 type: int, 

2482 object: Union[list[bytes], tuple[Union[bytes, int], list[bytes]]], 

2483 compression_level: int = -1, 

2484) -> Iterator[bytes]: 

2485 """Generate chunks for a pack object. 

2486 

2487 Args: 

2488 type: Numeric type of the object 

2489 object: Object to write 

2490 compression_level: the zlib compression level 

2491 Returns: Chunks 

2492 """ 

2493 if type in DELTA_TYPES: 

2494 if isinstance(object, tuple): 

2495 delta_base, object = object 

2496 else: 

2497 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2498 else: 

2499 delta_base = None 

2500 

2501 # Convert object to list of bytes chunks 

2502 if isinstance(object, bytes): 

2503 chunks = [object] 

2504 elif isinstance(object, list): 

2505 chunks = object 

2506 elif isinstance(object, ShaFile): 

2507 chunks = object.as_raw_chunks() 

2508 else: 

2509 # Shouldn't reach here with proper typing 

2510 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

2511 

2512 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks)))) 

2513 compressor = zlib.compressobj(level=compression_level) 

2514 for data in chunks: 

2515 yield compressor.compress(data) 

2516 yield compressor.flush() 

2517 

2518 

2519def write_pack_object( 

2520 write: Callable[[bytes], int], 

2521 type: int, 

2522 object: Union[list[bytes], tuple[Union[bytes, int], list[bytes]]], 

2523 sha: Optional["HashObject"] = None, 

2524 compression_level: int = -1, 

2525) -> int: 

2526 """Write pack object to a file. 

2527 

2528 Args: 

2529 write: Write function to use 

2530 type: Numeric type of the object 

2531 object: Object to write 

2532 sha: Optional SHA-1 hasher to update 

2533 compression_level: the zlib compression level 

2534 Returns: CRC32 checksum of the written object 

2535 """ 

2536 crc32 = 0 

2537 for chunk in pack_object_chunks(type, object, compression_level=compression_level): 

2538 write(chunk) 

2539 if sha is not None: 

2540 sha.update(chunk) 

2541 crc32 = binascii.crc32(chunk, crc32) 

2542 return crc32 & 0xFFFFFFFF 

2543 

2544 

2545def write_pack( 

2546 filename: str, 

2547 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2548 *, 

2549 deltify: Optional[bool] = None, 

2550 delta_window_size: Optional[int] = None, 

2551 compression_level: int = -1, 

2552) -> tuple[bytes, bytes]: 

2553 """Write a new pack data file. 

2554 

2555 Args: 

2556 filename: Path to the new pack file (without .pack extension) 

2557 objects: Objects to write to the pack 

2558 delta_window_size: Delta window size 

2559 deltify: Whether to deltify pack objects 

2560 compression_level: the zlib compression level 

2561 Returns: Tuple with checksum of pack file and index file 

2562 """ 

2563 with GitFile(filename + ".pack", "wb") as f: 

2564 entries, data_sum = write_pack_objects( 

2565 f, 

2566 objects, 

2567 delta_window_size=delta_window_size, 

2568 deltify=deltify, 

2569 compression_level=compression_level, 

2570 ) 

2571 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

2572 with GitFile(filename + ".idx", "wb") as f: 

2573 idx_sha = write_pack_index(f, entries_list, data_sum) 

2574 return data_sum, idx_sha 

2575 

2576 

2577def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

2578 """Yield chunks for a pack header.""" 

2579 yield b"PACK" # Pack header 

2580 yield struct.pack(b">L", 2) # Pack version 

2581 yield struct.pack(b">L", num_objects) # Number of objects in pack 

2582 

2583 

2584def write_pack_header( 

2585 write: Union[Callable[[bytes], int], IO[bytes]], num_objects: int 

2586) -> None: 

2587 """Write a pack header for the given number of objects.""" 

2588 write_fn: Callable[[bytes], int] 

2589 if hasattr(write, "write"): 

2590 write_fn = write.write 

2591 warnings.warn( 

2592 "write_pack_header() now takes a write rather than file argument", 

2593 DeprecationWarning, 

2594 stacklevel=2, 

2595 ) 

2596 else: 

2597 write_fn = write 

2598 for chunk in pack_header_chunks(num_objects): 

2599 write_fn(chunk) 

2600 

2601 

2602def find_reusable_deltas( 

2603 container: PackedObjectContainer, 

2604 object_ids: Set[bytes], 

2605 *, 

2606 other_haves: Optional[Set[bytes]] = None, 

2607 progress: Optional[Callable[..., None]] = None, 

2608) -> Iterator[UnpackedObject]: 

2609 """Find deltas in a pack that can be reused. 

2610 

2611 Args: 

2612 container: Pack container to search for deltas 

2613 object_ids: Set of object IDs to find deltas for 

2614 other_haves: Set of other object IDs we have 

2615 progress: Optional progress reporting callback 

2616 

2617 Returns: 

2618 Iterator of UnpackedObject entries that can be reused 

2619 """ 

2620 if other_haves is None: 

2621 other_haves = set() 

2622 reused = 0 

2623 for i, unpacked in enumerate( 

2624 container.iter_unpacked_subset( 

2625 object_ids, allow_missing=True, convert_ofs_delta=True 

2626 ) 

2627 ): 

2628 if progress is not None and i % 1000 == 0: 

2629 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

2630 if unpacked.pack_type_num == REF_DELTA: 

2631 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

2632 if hexsha in object_ids or hexsha in other_haves: 

2633 yield unpacked 

2634 reused += 1 

2635 if progress is not None: 

2636 progress((f"found {reused} deltas to reuse\n").encode()) 

2637 

2638 

2639def deltify_pack_objects( 

2640 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[bytes]]]], 

2641 *, 

2642 window_size: Optional[int] = None, 

2643 progress: Optional[Callable[..., None]] = None, 

2644) -> Iterator[UnpackedObject]: 

2645 """Generate deltas for pack objects. 

2646 

2647 Args: 

2648 objects: An iterable of (object, path) tuples to deltify. 

2649 window_size: Window size; None for default 

2650 progress: Optional progress reporting callback 

2651 Returns: Iterator over type_num, object id, delta_base, content 

2652 delta_base is None for full text entries 

2653 """ 

2654 

2655 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, Optional[bytes]]]]: 

2656 for e in objects: 

2657 if isinstance(e, ShaFile): 

2658 yield (e, (e.type_num, None)) 

2659 else: 

2660 yield (e[0], (e[0].type_num, e[1])) 

2661 

2662 sorted_objs = sort_objects_for_delta(objects_with_hints()) 

2663 yield from deltas_from_sorted_objects( 

2664 sorted_objs, 

2665 window_size=window_size, 

2666 progress=progress, 

2667 ) 

2668 

2669 

2670def sort_objects_for_delta( 

2671 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[PackHint]]]], 

2672) -> Iterator[tuple[ShaFile, Optional[bytes]]]: 

2673 """Sort objects for optimal delta compression. 

2674 

2675 Args: 

2676 objects: Iterator of objects or (object, hint) tuples 

2677 

2678 Returns: 

2679 Iterator of sorted (ShaFile, path) tuples 

2680 """ 

2681 magic = [] 

2682 for entry in objects: 

2683 if isinstance(entry, tuple): 

2684 obj, hint = entry 

2685 if hint is None: 

2686 type_num = None 

2687 path = None 

2688 else: 

2689 (type_num, path) = hint 

2690 else: 

2691 obj = entry 

2692 type_num = None 

2693 path = None 

2694 magic.append((type_num, path, -obj.raw_length(), obj)) 

2695 # Build a list of objects ordered by the magic Linus heuristic 

2696 # This helps us find good objects to diff against us 

2697 magic.sort() 

2698 return ((x[3], x[1]) for x in magic) 

2699 

2700 

2701def deltas_from_sorted_objects( 

2702 objects: Iterator[tuple[ShaFile, Optional[bytes]]], 

2703 window_size: Optional[int] = None, 

2704 progress: Optional[Callable[..., None]] = None, 

2705) -> Iterator[UnpackedObject]: 

2706 """Create deltas from sorted objects. 

2707 

2708 Args: 

2709 objects: Iterator of sorted objects to deltify 

2710 window_size: Delta window size; None for default 

2711 progress: Optional progress reporting callback 

2712 

2713 Returns: 

2714 Iterator of UnpackedObject entries 

2715 """ 

2716 # TODO(jelmer): Use threads 

2717 if window_size is None: 

2718 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

2719 

2720 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

2721 for i, (o, path) in enumerate(objects): 

2722 if progress is not None and i % 1000 == 0: 

2723 progress((f"generating deltas: {i}\r").encode()) 

2724 raw = o.as_raw_chunks() 

2725 winner = raw 

2726 winner_len = sum(map(len, winner)) 

2727 winner_base = None 

2728 for base_id, base_type_num, base in possible_bases: 

2729 if base_type_num != o.type_num: 

2730 continue 

2731 delta_len = 0 

2732 delta = [] 

2733 for chunk in create_delta(b"".join(base), b"".join(raw)): 

2734 delta_len += len(chunk) 

2735 if delta_len >= winner_len: 

2736 break 

2737 delta.append(chunk) 

2738 else: 

2739 winner_base = base_id 

2740 winner = delta 

2741 winner_len = sum(map(len, winner)) 

2742 yield UnpackedObject( 

2743 o.type_num, 

2744 sha=o.sha().digest(), 

2745 delta_base=winner_base, 

2746 decomp_len=winner_len, 

2747 decomp_chunks=winner, 

2748 ) 

2749 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

2750 while len(possible_bases) > window_size: 

2751 possible_bases.pop() 

2752 

2753 

2754def pack_objects_to_data( 

2755 objects: Union[ 

2756 Sequence[ShaFile], 

2757 Sequence[tuple[ShaFile, Optional[bytes]]], 

2758 Sequence[tuple[ShaFile, Optional[PackHint]]], 

2759 ], 

2760 *, 

2761 deltify: Optional[bool] = None, 

2762 delta_window_size: Optional[int] = None, 

2763 ofs_delta: bool = True, 

2764 progress: Optional[Callable[..., None]] = None, 

2765) -> tuple[int, Iterator[UnpackedObject]]: 

2766 """Create pack data from objects. 

2767 

2768 Args: 

2769 objects: Pack objects 

2770 deltify: Whether to deltify pack objects 

2771 delta_window_size: Delta window size 

2772 ofs_delta: Whether to use offset deltas 

2773 progress: Optional progress reporting callback 

2774 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2775 """ 

2776 # TODO(jelmer): support deltaifying 

2777 count = len(objects) 

2778 if deltify is None: 

2779 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2780 # slow at the moment. 

2781 deltify = False 

2782 if deltify: 

2783 return ( 

2784 count, 

2785 deltify_pack_objects( 

2786 iter(objects), # type: ignore 

2787 window_size=delta_window_size, 

2788 progress=progress, 

2789 ), 

2790 ) 

2791 else: 

2792 

2793 def iter_without_path() -> Iterator[UnpackedObject]: 

2794 for o in objects: 

2795 if isinstance(o, tuple): 

2796 yield full_unpacked_object(o[0]) 

2797 else: 

2798 yield full_unpacked_object(o) 

2799 

2800 return (count, iter_without_path()) 

2801 

2802 

2803def generate_unpacked_objects( 

2804 container: PackedObjectContainer, 

2805 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2806 delta_window_size: Optional[int] = None, 

2807 deltify: Optional[bool] = None, 

2808 reuse_deltas: bool = True, 

2809 ofs_delta: bool = True, 

2810 other_haves: Optional[set[bytes]] = None, 

2811 progress: Optional[Callable[..., None]] = None, 

2812) -> Iterator[UnpackedObject]: 

2813 """Create pack data from objects. 

2814 

2815 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2816 """ 

2817 todo = dict(object_ids) 

2818 if reuse_deltas: 

2819 for unpack in find_reusable_deltas( 

2820 container, set(todo), other_haves=other_haves, progress=progress 

2821 ): 

2822 del todo[sha_to_hex(unpack.sha())] 

2823 yield unpack 

2824 if deltify is None: 

2825 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2826 # slow at the moment. 

2827 deltify = False 

2828 if deltify: 

2829 objects_to_delta = container.iterobjects_subset( 

2830 todo.keys(), allow_missing=False 

2831 ) 

2832 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta) 

2833 yield from deltas_from_sorted_objects( 

2834 sorted_objs, 

2835 window_size=delta_window_size, 

2836 progress=progress, 

2837 ) 

2838 else: 

2839 for oid in todo: 

2840 yield full_unpacked_object(container[oid]) 

2841 

2842 

2843def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

2844 """Create an UnpackedObject from a ShaFile. 

2845 

2846 Args: 

2847 o: ShaFile object to convert 

2848 

2849 Returns: 

2850 UnpackedObject with full object data 

2851 """ 

2852 return UnpackedObject( 

2853 o.type_num, 

2854 delta_base=None, 

2855 crc32=None, 

2856 decomp_chunks=o.as_raw_chunks(), 

2857 sha=o.sha().digest(), 

2858 ) 

2859 

2860 

2861def write_pack_from_container( 

2862 write: Union[ 

2863 Callable[[bytes], None], 

2864 Callable[[Union[bytes, bytearray, memoryview]], int], 

2865 IO[bytes], 

2866 ], 

2867 container: PackedObjectContainer, 

2868 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2869 delta_window_size: Optional[int] = None, 

2870 deltify: Optional[bool] = None, 

2871 reuse_deltas: bool = True, 

2872 compression_level: int = -1, 

2873 other_haves: Optional[set[bytes]] = None, 

2874) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

2875 """Write a new pack data file. 

2876 

2877 Args: 

2878 write: write function to use 

2879 container: PackedObjectContainer 

2880 object_ids: Sequence of (object_id, hint) tuples to write 

2881 delta_window_size: Sliding window size for searching for deltas; 

2882 Set to None for default window size. 

2883 deltify: Whether to deltify objects 

2884 reuse_deltas: Whether to reuse existing deltas 

2885 compression_level: the zlib compression level to use 

2886 other_haves: Set of additional object IDs the receiver has 

2887 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2888 """ 

2889 pack_contents_count = len(object_ids) 

2890 pack_contents = generate_unpacked_objects( 

2891 container, 

2892 object_ids, 

2893 delta_window_size=delta_window_size, 

2894 deltify=deltify, 

2895 reuse_deltas=reuse_deltas, 

2896 other_haves=other_haves, 

2897 ) 

2898 

2899 return write_pack_data( 

2900 write, 

2901 pack_contents, 

2902 num_records=pack_contents_count, 

2903 compression_level=compression_level, 

2904 ) 

2905 

2906 

2907def write_pack_objects( 

2908 write: Union[Callable[[bytes], None], IO[bytes]], 

2909 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2910 *, 

2911 delta_window_size: Optional[int] = None, 

2912 deltify: Optional[bool] = None, 

2913 compression_level: int = -1, 

2914) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

2915 """Write a new pack data file. 

2916 

2917 Args: 

2918 write: write function to use 

2919 objects: Sequence of (object, path) tuples to write 

2920 delta_window_size: Sliding window size for searching for deltas; 

2921 Set to None for default window size. 

2922 deltify: Whether to deltify objects 

2923 compression_level: the zlib compression level to use 

2924 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2925 """ 

2926 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

2927 

2928 return write_pack_data( 

2929 write, 

2930 pack_contents, 

2931 num_records=pack_contents_count, 

2932 compression_level=compression_level, 

2933 ) 

2934 

2935 

2936class PackChunkGenerator: 

2937 """Generator for pack data chunks.""" 

2938 

2939 def __init__( 

2940 self, 

2941 num_records: Optional[int] = None, 

2942 records: Optional[Iterator[UnpackedObject]] = None, 

2943 progress: Optional[Callable[..., None]] = None, 

2944 compression_level: int = -1, 

2945 reuse_compressed: bool = True, 

2946 ) -> None: 

2947 """Initialize PackChunkGenerator. 

2948 

2949 Args: 

2950 num_records: Expected number of records 

2951 records: Iterator of pack records 

2952 progress: Optional progress callback 

2953 compression_level: Compression level (-1 for default) 

2954 reuse_compressed: Whether to reuse compressed chunks 

2955 """ 

2956 self.cs = sha1(b"") 

2957 self.entries: dict[bytes, tuple[int, int]] = {} 

2958 if records is None: 

2959 records = iter([]) # Empty iterator if None 

2960 self._it = self._pack_data_chunks( 

2961 records=records, 

2962 num_records=num_records, 

2963 progress=progress, 

2964 compression_level=compression_level, 

2965 reuse_compressed=reuse_compressed, 

2966 ) 

2967 

2968 def sha1digest(self) -> bytes: 

2969 """Return the SHA1 digest of the pack data.""" 

2970 return self.cs.digest() 

2971 

2972 def __iter__(self) -> Iterator[bytes]: 

2973 """Iterate over pack data chunks.""" 

2974 return self._it 

2975 

2976 def _pack_data_chunks( 

2977 self, 

2978 records: Iterator[UnpackedObject], 

2979 *, 

2980 num_records: Optional[int] = None, 

2981 progress: Optional[Callable[..., None]] = None, 

2982 compression_level: int = -1, 

2983 reuse_compressed: bool = True, 

2984 ) -> Iterator[bytes]: 

2985 """Iterate pack data file chunks. 

2986 

2987 Args: 

2988 records: Iterator over UnpackedObject 

2989 num_records: Number of records (defaults to len(records) if not specified) 

2990 progress: Function to report progress to 

2991 compression_level: the zlib compression level 

2992 reuse_compressed: Whether to reuse compressed chunks 

2993 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2994 """ 

2995 # Write the pack 

2996 if num_records is None: 

2997 num_records = len(records) # type: ignore 

2998 offset = 0 

2999 for chunk in pack_header_chunks(num_records): 

3000 yield chunk 

3001 self.cs.update(chunk) 

3002 offset += len(chunk) 

3003 actual_num_records = 0 

3004 for i, unpacked in enumerate(records): 

3005 type_num = unpacked.pack_type_num 

3006 if progress is not None and i % 1000 == 0: 

3007 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

3008 raw: Union[list[bytes], tuple[int, list[bytes]], tuple[bytes, list[bytes]]] 

3009 if unpacked.delta_base is not None: 

3010 assert isinstance(unpacked.delta_base, bytes), ( 

3011 f"Expected bytes, got {type(unpacked.delta_base)}" 

3012 ) 

3013 try: 

3014 base_offset, _base_crc32 = self.entries[unpacked.delta_base] 

3015 except KeyError: 

3016 type_num = REF_DELTA 

3017 assert isinstance(unpacked.delta_base, bytes) 

3018 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

3019 else: 

3020 type_num = OFS_DELTA 

3021 raw = (offset - base_offset, unpacked.decomp_chunks) 

3022 else: 

3023 raw = unpacked.decomp_chunks 

3024 chunks: Union[list[bytes], Iterator[bytes]] 

3025 if unpacked.comp_chunks is not None and reuse_compressed: 

3026 chunks = unpacked.comp_chunks 

3027 else: 

3028 chunks = pack_object_chunks( 

3029 type_num, raw, compression_level=compression_level 

3030 ) 

3031 crc32 = 0 

3032 object_size = 0 

3033 for chunk in chunks: 

3034 yield chunk 

3035 crc32 = binascii.crc32(chunk, crc32) 

3036 self.cs.update(chunk) 

3037 object_size += len(chunk) 

3038 actual_num_records += 1 

3039 self.entries[unpacked.sha()] = (offset, crc32) 

3040 offset += object_size 

3041 if actual_num_records != num_records: 

3042 raise AssertionError( 

3043 f"actual records written differs: {actual_num_records} != {num_records}" 

3044 ) 

3045 

3046 yield self.cs.digest() 

3047 

3048 

3049def write_pack_data( 

3050 write: Union[ 

3051 Callable[[bytes], None], 

3052 Callable[[Union[bytes, bytearray, memoryview]], int], 

3053 IO[bytes], 

3054 ], 

3055 records: Iterator[UnpackedObject], 

3056 *, 

3057 num_records: Optional[int] = None, 

3058 progress: Optional[Callable[..., None]] = None, 

3059 compression_level: int = -1, 

3060) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3061 """Write a new pack data file. 

3062 

3063 Args: 

3064 write: Write function to use 

3065 num_records: Number of records (defaults to len(records) if None) 

3066 records: Iterator over type_num, object_id, delta_base, raw 

3067 progress: Function to report progress to 

3068 compression_level: the zlib compression level 

3069 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3070 """ 

3071 chunk_generator = PackChunkGenerator( 

3072 num_records=num_records, 

3073 records=records, 

3074 progress=progress, 

3075 compression_level=compression_level, 

3076 ) 

3077 for chunk in chunk_generator: 

3078 if callable(write): 

3079 write(chunk) 

3080 else: 

3081 write.write(chunk) 

3082 return chunk_generator.entries, chunk_generator.sha1digest() 

3083 

3084 

3085def write_pack_index_v1( 

3086 f: IO[bytes], 

3087 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3088 pack_checksum: bytes, 

3089) -> bytes: 

3090 """Write a new pack index file. 

3091 

3092 Args: 

3093 f: A file-like object to write to 

3094 entries: List of tuples with object name (sha), offset_in_pack, 

3095 and crc32_checksum. 

3096 pack_checksum: Checksum of the pack file. 

3097 Returns: The SHA of the written index file 

3098 """ 

3099 f = SHA1Writer(f) 

3100 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3101 for name, _offset, _entry_checksum in entries: 

3102 fan_out_table[ord(name[:1])] += 1 

3103 # Fan-out table 

3104 for i in range(0x100): 

3105 f.write(struct.pack(">L", fan_out_table[i])) 

3106 fan_out_table[i + 1] += fan_out_table[i] 

3107 for name, offset, _entry_checksum in entries: 

3108 if not (offset <= 0xFFFFFFFF): 

3109 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3110 f.write(struct.pack(">L20s", offset, name)) 

3111 assert len(pack_checksum) == 20 

3112 f.write(pack_checksum) 

3113 return f.write_sha() 

3114 

3115 

3116def _delta_encode_size(size: int) -> bytes: 

3117 ret = bytearray() 

3118 c = size & 0x7F 

3119 size >>= 7 

3120 while size: 

3121 ret.append(c | 0x80) 

3122 c = size & 0x7F 

3123 size >>= 7 

3124 ret.append(c) 

3125 return bytes(ret) 

3126 

3127 

3128# The length of delta compression copy operations in version 2 packs is limited 

3129# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3130# 24-bit lengths in copy operations, but we always make version 2 packs. 

3131_MAX_COPY_LEN = 0xFFFF 

3132 

3133 

3134def _encode_copy_operation(start: int, length: int) -> bytes: 

3135 scratch = bytearray([0x80]) 

3136 for i in range(4): 

3137 if start & 0xFF << i * 8: 

3138 scratch.append((start >> i * 8) & 0xFF) 

3139 scratch[0] |= 1 << i 

3140 for i in range(2): 

3141 if length & 0xFF << i * 8: 

3142 scratch.append((length >> i * 8) & 0xFF) 

3143 scratch[0] |= 1 << (4 + i) 

3144 return bytes(scratch) 

3145 

3146 

3147def create_delta(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3148 """Use python difflib to work out how to transform base_buf to target_buf. 

3149 

3150 Args: 

3151 base_buf: Base buffer 

3152 target_buf: Target buffer 

3153 """ 

3154 if isinstance(base_buf, list): 

3155 base_buf = b"".join(base_buf) 

3156 if isinstance(target_buf, list): 

3157 target_buf = b"".join(target_buf) 

3158 assert isinstance(base_buf, bytes) 

3159 assert isinstance(target_buf, bytes) 

3160 # write delta header 

3161 yield _delta_encode_size(len(base_buf)) 

3162 yield _delta_encode_size(len(target_buf)) 

3163 # write out delta opcodes 

3164 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3165 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3166 # Git patch opcodes don't care about deletes! 

3167 # if opcode == 'replace' or opcode == 'delete': 

3168 # pass 

3169 if opcode == "equal": 

3170 # If they are equal, unpacker will use data from base_buf 

3171 # Write out an opcode that says what range to use 

3172 copy_start = i1 

3173 copy_len = i2 - i1 

3174 while copy_len > 0: 

3175 to_copy = min(copy_len, _MAX_COPY_LEN) 

3176 yield _encode_copy_operation(copy_start, to_copy) 

3177 copy_start += to_copy 

3178 copy_len -= to_copy 

3179 if opcode == "replace" or opcode == "insert": 

3180 # If we are replacing a range or adding one, then we just 

3181 # output it to the stream (prefixed by its size) 

3182 s = j2 - j1 

3183 o = j1 

3184 while s > 127: 

3185 yield bytes([127]) 

3186 yield bytes(memoryview(target_buf)[o : o + 127]) 

3187 s -= 127 

3188 o += 127 

3189 yield bytes([s]) 

3190 yield bytes(memoryview(target_buf)[o : o + s]) 

3191 

3192 

3193def apply_delta( 

3194 src_buf: Union[bytes, list[bytes]], delta: Union[bytes, list[bytes]] 

3195) -> list[bytes]: 

3196 """Based on the similar function in git's patch-delta.c. 

3197 

3198 Args: 

3199 src_buf: Source buffer 

3200 delta: Delta instructions 

3201 """ 

3202 if not isinstance(src_buf, bytes): 

3203 src_buf = b"".join(src_buf) 

3204 if not isinstance(delta, bytes): 

3205 delta = b"".join(delta) 

3206 out = [] 

3207 index = 0 

3208 delta_length = len(delta) 

3209 

3210 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3211 size = 0 

3212 i = 0 

3213 while delta: 

3214 cmd = ord(delta[index : index + 1]) 

3215 index += 1 

3216 size |= (cmd & ~0x80) << i 

3217 i += 7 

3218 if not cmd & 0x80: 

3219 break 

3220 return size, index 

3221 

3222 src_size, index = get_delta_header_size(delta, index) 

3223 dest_size, index = get_delta_header_size(delta, index) 

3224 if src_size != len(src_buf): 

3225 raise ApplyDeltaError( 

3226 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3227 ) 

3228 while index < delta_length: 

3229 cmd = ord(delta[index : index + 1]) 

3230 index += 1 

3231 if cmd & 0x80: 

3232 cp_off = 0 

3233 for i in range(4): 

3234 if cmd & (1 << i): 

3235 x = ord(delta[index : index + 1]) 

3236 index += 1 

3237 cp_off |= x << (i * 8) 

3238 cp_size = 0 

3239 # Version 3 packs can contain copy sizes larger than 64K. 

3240 for i in range(3): 

3241 if cmd & (1 << (4 + i)): 

3242 x = ord(delta[index : index + 1]) 

3243 index += 1 

3244 cp_size |= x << (i * 8) 

3245 if cp_size == 0: 

3246 cp_size = 0x10000 

3247 if ( 

3248 cp_off + cp_size < cp_size 

3249 or cp_off + cp_size > src_size 

3250 or cp_size > dest_size 

3251 ): 

3252 break 

3253 out.append(src_buf[cp_off : cp_off + cp_size]) 

3254 elif cmd != 0: 

3255 out.append(delta[index : index + cmd]) 

3256 index += cmd 

3257 else: 

3258 raise ApplyDeltaError("Invalid opcode 0") 

3259 

3260 if index != delta_length: 

3261 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3262 

3263 if dest_size != chunks_length(out): 

3264 raise ApplyDeltaError("dest size incorrect") 

3265 

3266 return out 

3267 

3268 

3269def write_pack_index_v2( 

3270 f: IO[bytes], 

3271 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3272 pack_checksum: bytes, 

3273) -> bytes: 

3274 """Write a new pack index file. 

3275 

3276 Args: 

3277 f: File-like object to write to 

3278 entries: List of tuples with object name (sha), offset_in_pack, and 

3279 crc32_checksum. 

3280 pack_checksum: Checksum of the pack file. 

3281 Returns: The SHA of the index file written 

3282 """ 

3283 f = SHA1Writer(f) 

3284 f.write(b"\377tOc") # Magic! 

3285 f.write(struct.pack(">L", 2)) 

3286 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3287 for name, offset, entry_checksum in entries: 

3288 fan_out_table[ord(name[:1])] += 1 

3289 # Fan-out table 

3290 largetable: list[int] = [] 

3291 for i in range(0x100): 

3292 f.write(struct.pack(b">L", fan_out_table[i])) 

3293 fan_out_table[i + 1] += fan_out_table[i] 

3294 for name, offset, entry_checksum in entries: 

3295 f.write(name) 

3296 for name, offset, entry_checksum in entries: 

3297 f.write(struct.pack(b">L", entry_checksum)) 

3298 for name, offset, entry_checksum in entries: 

3299 if offset < 2**31: 

3300 f.write(struct.pack(b">L", offset)) 

3301 else: 

3302 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3303 largetable.append(offset) 

3304 for offset in largetable: 

3305 f.write(struct.pack(b">Q", offset)) 

3306 assert len(pack_checksum) == 20 

3307 f.write(pack_checksum) 

3308 return f.write_sha() 

3309 

3310 

3311def write_pack_index_v3( 

3312 f: IO[bytes], 

3313 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3314 pack_checksum: bytes, 

3315 hash_algorithm: int = 1, 

3316) -> bytes: 

3317 """Write a new pack index file in v3 format. 

3318 

3319 Args: 

3320 f: File-like object to write to 

3321 entries: List of tuples with object name (sha), offset_in_pack, and 

3322 crc32_checksum. 

3323 pack_checksum: Checksum of the pack file. 

3324 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3325 Returns: The SHA of the index file written 

3326 """ 

3327 if hash_algorithm == 1: 

3328 hash_size = 20 # SHA-1 

3329 writer_cls = SHA1Writer 

3330 elif hash_algorithm == 2: 

3331 hash_size = 32 # SHA-256 

3332 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3333 raise NotImplementedError("SHA-256 support not yet implemented") 

3334 else: 

3335 raise ValueError(f"Unknown hash algorithm {hash_algorithm}") 

3336 

3337 # Convert entries to list to allow multiple iterations 

3338 entries_list = list(entries) 

3339 

3340 # Calculate shortest unambiguous prefix length for object names 

3341 # For now, use full hash size (this could be optimized) 

3342 shortened_oid_len = hash_size 

3343 

3344 f = writer_cls(f) 

3345 f.write(b"\377tOc") # Magic! 

3346 f.write(struct.pack(">L", 3)) # Version 3 

3347 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm 

3348 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3349 

3350 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3351 for name, offset, entry_checksum in entries_list: 

3352 if len(name) != hash_size: 

3353 raise ValueError( 

3354 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3355 ) 

3356 fan_out_table[ord(name[:1])] += 1 

3357 

3358 # Fan-out table 

3359 largetable: list[int] = [] 

3360 for i in range(0x100): 

3361 f.write(struct.pack(b">L", fan_out_table[i])) 

3362 fan_out_table[i + 1] += fan_out_table[i] 

3363 

3364 # Object names table 

3365 for name, offset, entry_checksum in entries_list: 

3366 f.write(name) 

3367 

3368 # CRC32 checksums table 

3369 for name, offset, entry_checksum in entries_list: 

3370 f.write(struct.pack(b">L", entry_checksum)) 

3371 

3372 # Offset table 

3373 for name, offset, entry_checksum in entries_list: 

3374 if offset < 2**31: 

3375 f.write(struct.pack(b">L", offset)) 

3376 else: 

3377 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3378 largetable.append(offset) 

3379 

3380 # Large offset table 

3381 for offset in largetable: 

3382 f.write(struct.pack(b">Q", offset)) 

3383 

3384 assert len(pack_checksum) == hash_size, ( 

3385 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3386 ) 

3387 f.write(pack_checksum) 

3388 return f.write_sha() 

3389 

3390 

3391def write_pack_index( 

3392 f: IO[bytes], 

3393 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3394 pack_checksum: bytes, 

3395 progress: Optional[Callable[..., None]] = None, 

3396 version: Optional[int] = None, 

3397) -> bytes: 

3398 """Write a pack index file. 

3399 

3400 Args: 

3401 f: File-like object to write to. 

3402 entries: List of (checksum, offset, crc32) tuples 

3403 pack_checksum: Checksum of the pack file. 

3404 progress: Progress function (not currently used) 

3405 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3406 

3407 Returns: 

3408 SHA of the written index file 

3409 """ 

3410 if version is None: 

3411 version = DEFAULT_PACK_INDEX_VERSION 

3412 

3413 if version == 1: 

3414 return write_pack_index_v1(f, entries, pack_checksum) 

3415 elif version == 2: 

3416 return write_pack_index_v2(f, entries, pack_checksum) 

3417 elif version == 3: 

3418 return write_pack_index_v3(f, entries, pack_checksum) 

3419 else: 

3420 raise ValueError(f"Unsupported pack index version: {version}") 

3421 

3422 

3423class Pack: 

3424 """A Git pack object.""" 

3425 

3426 _data_load: Optional[Callable[[], PackData]] 

3427 _idx_load: Optional[Callable[[], PackIndex]] 

3428 

3429 _data: Optional[PackData] 

3430 _idx: Optional[PackIndex] 

3431 

3432 def __init__( 

3433 self, 

3434 basename: str, 

3435 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

3436 *, 

3437 delta_window_size: Optional[int] = None, 

3438 window_memory: Optional[int] = None, 

3439 delta_cache_size: Optional[int] = None, 

3440 depth: Optional[int] = None, 

3441 threads: Optional[int] = None, 

3442 big_file_threshold: Optional[int] = None, 

3443 ) -> None: 

3444 """Initialize a Pack object. 

3445 

3446 Args: 

3447 basename: Base path for pack files (without .pack/.idx extension) 

3448 resolve_ext_ref: Optional function to resolve external references 

3449 delta_window_size: Size of the delta compression window 

3450 window_memory: Memory limit for delta compression window 

3451 delta_cache_size: Size of the delta cache 

3452 depth: Maximum depth for delta chains 

3453 threads: Number of threads to use for operations 

3454 big_file_threshold: Size threshold for big file handling 

3455 """ 

3456 self._basename = basename 

3457 self._data = None 

3458 self._idx = None 

3459 self._idx_path = self._basename + ".idx" 

3460 self._data_path = self._basename + ".pack" 

3461 self.delta_window_size = delta_window_size 

3462 self.window_memory = window_memory 

3463 self.delta_cache_size = delta_cache_size 

3464 self.depth = depth 

3465 self.threads = threads 

3466 self.big_file_threshold = big_file_threshold 

3467 self._data_load = lambda: PackData( 

3468 self._data_path, 

3469 delta_window_size=delta_window_size, 

3470 window_memory=window_memory, 

3471 delta_cache_size=delta_cache_size, 

3472 depth=depth, 

3473 threads=threads, 

3474 big_file_threshold=big_file_threshold, 

3475 ) 

3476 self._idx_load = lambda: load_pack_index(self._idx_path) 

3477 self.resolve_ext_ref = resolve_ext_ref 

3478 

3479 @classmethod 

3480 def from_lazy_objects( 

3481 cls, data_fn: Callable[[], PackData], idx_fn: Callable[[], PackIndex] 

3482 ) -> "Pack": 

3483 """Create a new pack object from callables to load pack data and index objects.""" 

3484 ret = cls("") 

3485 ret._data_load = data_fn 

3486 ret._idx_load = idx_fn 

3487 return ret 

3488 

3489 @classmethod 

3490 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

3491 """Create a new pack object from pack data and index objects.""" 

3492 ret = cls("") 

3493 ret._data = data 

3494 ret._data_load = None 

3495 ret._idx = idx 

3496 ret._idx_load = None 

3497 ret.check_length_and_checksum() 

3498 return ret 

3499 

3500 def name(self) -> bytes: 

3501 """The SHA over the SHAs of the objects in this pack.""" 

3502 return self.index.objects_sha1() 

3503 

3504 @property 

3505 def data(self) -> PackData: 

3506 """The pack data object being used.""" 

3507 if self._data is None: 

3508 assert self._data_load 

3509 self._data = self._data_load() 

3510 self.check_length_and_checksum() 

3511 return self._data 

3512 

3513 @property 

3514 def index(self) -> PackIndex: 

3515 """The index being used. 

3516 

3517 Note: This may be an in-memory index 

3518 """ 

3519 if self._idx is None: 

3520 assert self._idx_load 

3521 self._idx = self._idx_load() 

3522 return self._idx 

3523 

3524 def close(self) -> None: 

3525 """Close the pack file and index.""" 

3526 if self._data is not None: 

3527 self._data.close() 

3528 if self._idx is not None: 

3529 self._idx.close() 

3530 

3531 def __enter__(self) -> "Pack": 

3532 """Enter context manager.""" 

3533 return self 

3534 

3535 def __exit__( 

3536 self, 

3537 exc_type: Optional[type], 

3538 exc_val: Optional[BaseException], 

3539 exc_tb: Optional[TracebackType], 

3540 ) -> None: 

3541 """Exit context manager.""" 

3542 self.close() 

3543 

3544 def __eq__(self, other: object) -> bool: 

3545 """Check equality with another pack.""" 

3546 if not isinstance(other, Pack): 

3547 return False 

3548 return self.index == other.index 

3549 

3550 def __len__(self) -> int: 

3551 """Number of entries in this pack.""" 

3552 return len(self.index) 

3553 

3554 def __repr__(self) -> str: 

3555 """Return string representation of this pack.""" 

3556 return f"{self.__class__.__name__}({self._basename!r})" 

3557 

3558 def __iter__(self) -> Iterator[bytes]: 

3559 """Iterate over all the sha1s of the objects in this pack.""" 

3560 return iter(self.index) 

3561 

3562 def check_length_and_checksum(self) -> None: 

3563 """Sanity check the length and checksum of the pack index and data.""" 

3564 assert len(self.index) == len(self.data), ( 

3565 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

3566 ) 

3567 idx_stored_checksum = self.index.get_pack_checksum() 

3568 data_stored_checksum = self.data.get_stored_checksum() 

3569 if ( 

3570 idx_stored_checksum is not None 

3571 and idx_stored_checksum != data_stored_checksum 

3572 ): 

3573 raise ChecksumMismatch( 

3574 sha_to_hex(idx_stored_checksum), 

3575 sha_to_hex(data_stored_checksum), 

3576 ) 

3577 

3578 def check(self) -> None: 

3579 """Check the integrity of this pack. 

3580 

3581 Raises: 

3582 ChecksumMismatch: if a checksum for the index or data is wrong 

3583 """ 

3584 self.index.check() 

3585 self.data.check() 

3586 for obj in self.iterobjects(): 

3587 obj.check() 

3588 # TODO: object connectivity checks 

3589 

3590 def get_stored_checksum(self) -> bytes: 

3591 """Return the stored checksum of the pack data.""" 

3592 return self.data.get_stored_checksum() 

3593 

3594 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

3595 """Return pack tuples for all objects in pack.""" 

3596 return [(o, None) for o in self.iterobjects()] 

3597 

3598 def __contains__(self, sha1: bytes) -> bool: 

3599 """Check whether this pack contains a particular SHA1.""" 

3600 try: 

3601 self.index.object_offset(sha1) 

3602 return True 

3603 except KeyError: 

3604 return False 

3605 

3606 def get_raw(self, sha1: bytes) -> tuple[int, bytes]: 

3607 """Get raw object data by SHA1.""" 

3608 offset = self.index.object_offset(sha1) 

3609 obj_type, obj = self.data.get_object_at(offset) 

3610 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

3611 return type_num, b"".join(chunks) # type: ignore[arg-type] 

3612 

3613 def __getitem__(self, sha1: bytes) -> ShaFile: 

3614 """Retrieve the specified SHA1.""" 

3615 type, uncomp = self.get_raw(sha1) 

3616 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

3617 

3618 def iterobjects(self) -> Iterator[ShaFile]: 

3619 """Iterate over the objects in this pack.""" 

3620 return iter( 

3621 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

3622 ) 

3623 

3624 def iterobjects_subset( 

3625 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

3626 ) -> Iterator[ShaFile]: 

3627 """Iterate over a subset of objects in this pack.""" 

3628 return ( 

3629 uo 

3630 for uo in PackInflater.for_pack_subset( 

3631 self, 

3632 shas, 

3633 allow_missing=allow_missing, 

3634 resolve_ext_ref=self.resolve_ext_ref, 

3635 ) 

3636 if uo.id in shas 

3637 ) 

3638 

3639 def iter_unpacked_subset( 

3640 self, 

3641 shas: Iterable[ObjectID], 

3642 *, 

3643 include_comp: bool = False, 

3644 allow_missing: bool = False, 

3645 convert_ofs_delta: bool = False, 

3646 ) -> Iterator[UnpackedObject]: 

3647 """Iterate over unpacked objects in subset.""" 

3648 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

3649 ofs: dict[int, bytes] = {} 

3650 todo = set(shas) 

3651 for unpacked in self.iter_unpacked(include_comp=include_comp): 

3652 sha = unpacked.sha() 

3653 if unpacked.offset is not None: 

3654 ofs[unpacked.offset] = sha 

3655 hexsha = sha_to_hex(sha) 

3656 if hexsha in todo: 

3657 if unpacked.pack_type_num == OFS_DELTA: 

3658 assert isinstance(unpacked.delta_base, int) 

3659 assert unpacked.offset is not None 

3660 base_offset = unpacked.offset - unpacked.delta_base 

3661 try: 

3662 unpacked.delta_base = ofs[base_offset] 

3663 except KeyError: 

3664 ofs_pending[base_offset].append(unpacked) 

3665 continue 

3666 else: 

3667 unpacked.pack_type_num = REF_DELTA 

3668 yield unpacked 

3669 todo.remove(hexsha) 

3670 if unpacked.offset is not None: 

3671 for child in ofs_pending.pop(unpacked.offset, []): 

3672 child.pack_type_num = REF_DELTA 

3673 child.delta_base = sha 

3674 yield child 

3675 assert not ofs_pending 

3676 if not allow_missing and todo: 

3677 raise UnresolvedDeltas(list(todo)) 

3678 

3679 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

3680 """Iterate over all unpacked objects in this pack.""" 

3681 ofs_to_entries = { 

3682 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

3683 } 

3684 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

3685 assert unpacked.offset is not None 

3686 (sha, crc32) = ofs_to_entries[unpacked.offset] 

3687 unpacked._sha = sha 

3688 unpacked.crc32 = crc32 

3689 yield unpacked 

3690 

3691 def keep(self, msg: Optional[bytes] = None) -> str: 

3692 """Add a .keep file for the pack, preventing git from garbage collecting it. 

3693 

3694 Args: 

3695 msg: A message written inside the .keep file; can be used later 

3696 to determine whether or not a .keep file is obsolete. 

3697 Returns: The path of the .keep file, as a string. 

3698 """ 

3699 keepfile_name = f"{self._basename}.keep" 

3700 with GitFile(keepfile_name, "wb") as keepfile: 

3701 if msg: 

3702 keepfile.write(msg) 

3703 keepfile.write(b"\n") 

3704 return keepfile_name 

3705 

3706 def get_ref(self, sha: bytes) -> tuple[Optional[int], int, OldUnpackedObject]: 

3707 """Get the object for a ref SHA, only looking in this pack.""" 

3708 # TODO: cache these results 

3709 try: 

3710 offset = self.index.object_offset(sha) 

3711 except KeyError: 

3712 offset = None 

3713 if offset: 

3714 type, obj = self.data.get_object_at(offset) 

3715 elif self.resolve_ext_ref: 

3716 type, obj = self.resolve_ext_ref(sha) 

3717 else: 

3718 raise KeyError(sha) 

3719 return offset, type, obj 

3720 

3721 def resolve_object( 

3722 self, 

3723 offset: int, 

3724 type: int, 

3725 obj: OldUnpackedObject, 

3726 get_ref: Optional[ 

3727 Callable[[bytes], tuple[Optional[int], int, OldUnpackedObject]] 

3728 ] = None, 

3729 ) -> tuple[int, OldUnpackedObject]: 

3730 """Resolve an object, possibly resolving deltas when necessary. 

3731 

3732 Returns: Tuple with object type and contents. 

3733 """ 

3734 # Walk down the delta chain, building a stack of deltas to reach 

3735 # the requested object. 

3736 base_offset = offset 

3737 base_type = type 

3738 base_obj = obj 

3739 delta_stack = [] 

3740 while base_type in DELTA_TYPES: 

3741 prev_offset = base_offset 

3742 if get_ref is None: 

3743 get_ref = self.get_ref 

3744 if base_type == OFS_DELTA: 

3745 (delta_offset, delta) = base_obj 

3746 # TODO: clean up asserts and replace with nicer error messages 

3747 assert isinstance(delta_offset, int), ( 

3748 f"Expected int, got {delta_offset.__class__}" 

3749 ) 

3750 base_offset = base_offset - delta_offset 

3751 base_type, base_obj = self.data.get_object_at(base_offset) 

3752 assert isinstance(base_type, int) 

3753 elif base_type == REF_DELTA: 

3754 (basename, delta) = base_obj 

3755 assert isinstance(basename, bytes) and len(basename) == 20 

3756 base_offset, base_type, base_obj = get_ref(basename) # type: ignore[assignment] 

3757 assert isinstance(base_type, int) 

3758 if base_offset == prev_offset: # object is based on itself 

3759 raise UnresolvedDeltas([basename]) 

3760 delta_stack.append((prev_offset, base_type, delta)) 

3761 

3762 # Now grab the base object (mustn't be a delta) and apply the 

3763 # deltas all the way up the stack. 

3764 chunks = base_obj 

3765 for prev_offset, _delta_type, delta in reversed(delta_stack): 

3766 # Convert chunks to bytes for apply_delta if needed 

3767 if isinstance(chunks, list): 

3768 chunks_bytes = b"".join(chunks) 

3769 elif isinstance(chunks, tuple): 

3770 # For tuple type, second element is the actual data 

3771 _, chunk_data = chunks 

3772 if isinstance(chunk_data, list): 

3773 chunks_bytes = b"".join(chunk_data) 

3774 else: 

3775 chunks_bytes = chunk_data 

3776 else: 

3777 chunks_bytes = chunks 

3778 

3779 # Apply delta and get result as list 

3780 chunks = apply_delta(chunks_bytes, delta) 

3781 

3782 if prev_offset is not None: 

3783 self.data._offset_cache[prev_offset] = base_type, chunks 

3784 return base_type, chunks 

3785 

3786 def entries( 

3787 self, progress: Optional[Callable[[int, int], None]] = None 

3788 ) -> Iterator[PackIndexEntry]: 

3789 """Yield entries summarizing the contents of this pack. 

3790 

3791 Args: 

3792 progress: Progress function, called with current and total 

3793 object count. 

3794 Returns: iterator of tuples with (sha, offset, crc32) 

3795 """ 

3796 return self.data.iterentries( 

3797 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3798 ) 

3799 

3800 def sorted_entries( 

3801 self, progress: Optional[ProgressFn] = None 

3802 ) -> Iterator[PackIndexEntry]: 

3803 """Return entries in this pack, sorted by SHA. 

3804 

3805 Args: 

3806 progress: Progress function, called with current and total 

3807 object count 

3808 Returns: Iterator of tuples with (sha, offset, crc32) 

3809 """ 

3810 return iter( 

3811 self.data.sorted_entries( 

3812 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3813 ) 

3814 ) 

3815 

3816 def get_unpacked_object( 

3817 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True 

3818 ) -> UnpackedObject: 

3819 """Get the unpacked object for a sha. 

3820 

3821 Args: 

3822 sha: SHA of object to fetch 

3823 include_comp: Whether to include compression data in UnpackedObject 

3824 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

3825 """ 

3826 offset = self.index.object_offset(sha) 

3827 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

3828 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

3829 assert isinstance(unpacked.delta_base, int) 

3830 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

3831 unpacked.pack_type_num = REF_DELTA 

3832 return unpacked 

3833 

3834 

3835def extend_pack( 

3836 f: BinaryIO, 

3837 object_ids: Set[ObjectID], 

3838 get_raw: Callable[[ObjectID], tuple[int, bytes]], 

3839 *, 

3840 compression_level: int = -1, 

3841 progress: Optional[Callable[[bytes], None]] = None, 

3842) -> tuple[bytes, list[tuple[bytes, int, int]]]: 

3843 """Extend a pack file with more objects. 

3844 

3845 The caller should make sure that object_ids does not contain any objects 

3846 that are already in the pack 

3847 """ 

3848 # Update the header with the new number of objects. 

3849 f.seek(0) 

3850 _version, num_objects = read_pack_header(f.read) 

3851 

3852 if object_ids: 

3853 f.seek(0) 

3854 write_pack_header(f.write, num_objects + len(object_ids)) 

3855 

3856 # Must flush before reading (http://bugs.python.org/issue3207) 

3857 f.flush() 

3858 

3859 # Rescan the rest of the pack, computing the SHA with the new header. 

3860 new_sha = compute_file_sha(f, end_ofs=-20) 

3861 

3862 # Must reposition before writing (http://bugs.python.org/issue3207) 

3863 f.seek(0, os.SEEK_CUR) 

3864 

3865 extra_entries = [] 

3866 

3867 # Complete the pack. 

3868 for i, object_id in enumerate(object_ids): 

3869 if progress is not None: 

3870 progress( 

3871 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

3872 ) 

3873 assert len(object_id) == 20 

3874 type_num, data = get_raw(object_id) 

3875 offset = f.tell() 

3876 crc32 = write_pack_object( 

3877 f.write, 

3878 type_num, 

3879 [data], # Convert bytes to list[bytes] 

3880 sha=new_sha, 

3881 compression_level=compression_level, 

3882 ) 

3883 extra_entries.append((object_id, offset, crc32)) 

3884 pack_sha = new_sha.digest() 

3885 f.write(pack_sha) 

3886 return pack_sha, extra_entries 

3887 

3888 

3889try: 

3890 from dulwich._pack import ( # type: ignore 

3891 apply_delta, 

3892 bisect_find_sha, 

3893 ) 

3894except ImportError: 

3895 pass