Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1665 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36import binascii 

37from collections import defaultdict, deque 

38from contextlib import suppress 

39from io import BytesIO, UnsupportedOperation 

40 

41try: 

42 from cdifflib import CSequenceMatcher as SequenceMatcher 

43except ModuleNotFoundError: 

44 from difflib import SequenceMatcher 

45 

46import os 

47import struct 

48import sys 

49import warnings 

50import zlib 

51from collections.abc import Callable, Iterable, Iterator, Sequence, Set 

52from hashlib import sha1 

53from itertools import chain 

54from os import SEEK_CUR, SEEK_END 

55from struct import unpack_from 

56from types import TracebackType 

57from typing import ( 

58 IO, 

59 TYPE_CHECKING, 

60 Any, 

61 BinaryIO, 

62 Generic, 

63 Protocol, 

64 TypeVar, 

65) 

66 

67try: 

68 import mmap 

69except ImportError: 

70 has_mmap = False 

71else: 

72 has_mmap = True 

73 

74if TYPE_CHECKING: 

75 from _hashlib import HASH as HashObject 

76 

77 from .bitmap import PackBitmap 

78 from .commit_graph import CommitGraph 

79 from .object_store import BaseObjectStore 

80 

81# For some reason the above try, except fails to set has_mmap = False for plan9 

82if sys.platform == "Plan9": 

83 has_mmap = False 

84 

85from . import replace_me 

86from .errors import ApplyDeltaError, ChecksumMismatch 

87from .file import GitFile, _GitFile 

88from .lru_cache import LRUSizeCache 

89from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex 

90 

91OFS_DELTA = 6 

92REF_DELTA = 7 

93 

94DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

95 

96 

97DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

98 

99# Keep pack files under 16Mb in memory, otherwise write them out to disk 

100PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

101 

102# Default pack index version to use when none is specified 

103DEFAULT_PACK_INDEX_VERSION = 2 

104 

105 

106OldUnpackedObject = tuple[bytes | int, list[bytes]] | list[bytes] 

107ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

108ProgressFn = Callable[[int, str], None] 

109PackHint = tuple[int, bytes | None] 

110 

111 

112class UnresolvedDeltas(Exception): 

113 """Delta objects could not be resolved.""" 

114 

115 def __init__(self, shas: list[bytes]) -> None: 

116 """Initialize UnresolvedDeltas exception. 

117 

118 Args: 

119 shas: List of SHA hashes for unresolved delta objects 

120 """ 

121 self.shas = shas 

122 

123 

124class ObjectContainer(Protocol): 

125 """Protocol for objects that can contain git objects.""" 

126 

127 def add_object(self, obj: ShaFile) -> None: 

128 """Add a single object to this object store.""" 

129 

130 def add_objects( 

131 self, 

132 objects: Sequence[tuple[ShaFile, str | None]], 

133 progress: Callable[..., None] | None = None, 

134 ) -> "Pack | None": 

135 """Add a set of objects to this object store. 

136 

137 Args: 

138 objects: Iterable over a list of (object, path) tuples 

139 progress: Progress callback for object insertion 

140 Returns: Optional Pack object of the objects written. 

141 """ 

142 

143 def __contains__(self, sha1: bytes) -> bool: 

144 """Check if a hex sha is present.""" 

145 

146 def __getitem__(self, sha1: bytes) -> ShaFile: 

147 """Retrieve an object.""" 

148 

149 def get_commit_graph(self) -> "CommitGraph | None": 

150 """Get the commit graph for this object store. 

151 

152 Returns: 

153 CommitGraph object if available, None otherwise 

154 """ 

155 return None 

156 

157 

158class PackedObjectContainer(ObjectContainer): 

159 """Container for objects packed in a pack file.""" 

160 

161 def get_unpacked_object( 

162 self, sha1: bytes, *, include_comp: bool = False 

163 ) -> "UnpackedObject": 

164 """Get a raw unresolved object. 

165 

166 Args: 

167 sha1: SHA-1 hash of the object 

168 include_comp: Whether to include compressed data 

169 

170 Returns: 

171 UnpackedObject instance 

172 """ 

173 raise NotImplementedError(self.get_unpacked_object) 

174 

175 def iterobjects_subset( 

176 self, shas: Iterable[bytes], *, allow_missing: bool = False 

177 ) -> Iterator[ShaFile]: 

178 """Iterate over a subset of objects. 

179 

180 Args: 

181 shas: Iterable of object SHAs to retrieve 

182 allow_missing: If True, skip missing objects 

183 

184 Returns: 

185 Iterator of ShaFile objects 

186 """ 

187 raise NotImplementedError(self.iterobjects_subset) 

188 

189 def iter_unpacked_subset( 

190 self, 

191 shas: Iterable[bytes], 

192 *, 

193 include_comp: bool = False, 

194 allow_missing: bool = False, 

195 convert_ofs_delta: bool = True, 

196 ) -> Iterator["UnpackedObject"]: 

197 """Iterate over unpacked objects from a subset of SHAs. 

198 

199 Args: 

200 shas: Set of object SHAs to retrieve 

201 include_comp: Include compressed data if True 

202 allow_missing: If True, skip missing objects 

203 convert_ofs_delta: If True, convert offset deltas to ref deltas 

204 

205 Returns: 

206 Iterator of UnpackedObject instances 

207 """ 

208 raise NotImplementedError(self.iter_unpacked_subset) 

209 

210 

211class UnpackedObjectStream: 

212 """Abstract base class for a stream of unpacked objects.""" 

213 

214 def __iter__(self) -> Iterator["UnpackedObject"]: 

215 """Iterate over unpacked objects.""" 

216 raise NotImplementedError(self.__iter__) 

217 

218 def __len__(self) -> int: 

219 """Return the number of objects in the stream.""" 

220 raise NotImplementedError(self.__len__) 

221 

222 

223def take_msb_bytes( 

224 read: Callable[[int], bytes], crc32: int | None = None 

225) -> tuple[list[int], int | None]: 

226 """Read bytes marked with most significant bit. 

227 

228 Args: 

229 read: Read function 

230 crc32: Optional CRC32 checksum to update 

231 

232 Returns: 

233 Tuple of (list of bytes read, updated CRC32 or None) 

234 """ 

235 ret: list[int] = [] 

236 while len(ret) == 0 or ret[-1] & 0x80: 

237 b = read(1) 

238 if crc32 is not None: 

239 crc32 = binascii.crc32(b, crc32) 

240 ret.append(ord(b[:1])) 

241 return ret, crc32 

242 

243 

244class PackFileDisappeared(Exception): 

245 """Raised when a pack file unexpectedly disappears.""" 

246 

247 def __init__(self, obj: object) -> None: 

248 """Initialize PackFileDisappeared exception. 

249 

250 Args: 

251 obj: The object that triggered the exception 

252 """ 

253 self.obj = obj 

254 

255 

256class UnpackedObject: 

257 """Class encapsulating an object unpacked from a pack file. 

258 

259 These objects should only be created from within unpack_object. Most 

260 members start out as empty and are filled in at various points by 

261 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

262 

263 End users of this object should take care that the function they're getting 

264 this object from is guaranteed to set the members they need. 

265 """ 

266 

267 __slots__ = [ 

268 "_sha", # Cached binary SHA. 

269 "comp_chunks", # Compressed object chunks. 

270 "crc32", # CRC32. 

271 "decomp_chunks", # Decompressed object chunks. 

272 "decomp_len", # Decompressed length of this object. 

273 "delta_base", # Delta base offset or SHA. 

274 "obj_chunks", # Decompressed and delta-resolved chunks. 

275 "obj_type_num", # Type of this object. 

276 "offset", # Offset in its pack. 

277 "pack_type_num", # Type of this object in the pack (may be a delta). 

278 ] 

279 

280 obj_type_num: int | None 

281 obj_chunks: list[bytes] | None 

282 delta_base: None | bytes | int 

283 decomp_chunks: list[bytes] 

284 comp_chunks: list[bytes] | None 

285 decomp_len: int | None 

286 crc32: int | None 

287 offset: int | None 

288 pack_type_num: int 

289 _sha: bytes | None 

290 

291 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

292 # methods of this object. 

293 def __init__( 

294 self, 

295 pack_type_num: int, 

296 *, 

297 delta_base: None | bytes | int = None, 

298 decomp_len: int | None = None, 

299 crc32: int | None = None, 

300 sha: bytes | None = None, 

301 decomp_chunks: list[bytes] | None = None, 

302 offset: int | None = None, 

303 ) -> None: 

304 """Initialize an UnpackedObject. 

305 

306 Args: 

307 pack_type_num: Type number of this object in the pack 

308 delta_base: Delta base (offset or SHA) if this is a delta object 

309 decomp_len: Decompressed length of this object 

310 crc32: CRC32 checksum 

311 sha: SHA-1 hash of the object 

312 decomp_chunks: Decompressed chunks 

313 offset: Offset in the pack file 

314 """ 

315 self.offset = offset 

316 self._sha = sha 

317 self.pack_type_num = pack_type_num 

318 self.delta_base = delta_base 

319 self.comp_chunks = None 

320 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

321 if decomp_chunks is not None and decomp_len is None: 

322 self.decomp_len = sum(map(len, decomp_chunks)) 

323 else: 

324 self.decomp_len = decomp_len 

325 self.crc32 = crc32 

326 

327 if pack_type_num in DELTA_TYPES: 

328 self.obj_type_num = None 

329 self.obj_chunks = None 

330 else: 

331 self.obj_type_num = pack_type_num 

332 self.obj_chunks = self.decomp_chunks 

333 self.delta_base = delta_base 

334 

335 def sha(self) -> bytes: 

336 """Return the binary SHA of this object.""" 

337 if self._sha is None: 

338 assert self.obj_type_num is not None and self.obj_chunks is not None 

339 self._sha = obj_sha(self.obj_type_num, self.obj_chunks) 

340 return self._sha 

341 

342 def sha_file(self) -> ShaFile: 

343 """Return a ShaFile from this object.""" 

344 assert self.obj_type_num is not None and self.obj_chunks is not None 

345 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

346 

347 # Only provided for backwards compatibility with code that expects either 

348 # chunks or a delta tuple. 

349 def _obj(self) -> OldUnpackedObject: 

350 """Return the decompressed chunks, or (delta base, delta chunks).""" 

351 if self.pack_type_num in DELTA_TYPES: 

352 assert isinstance(self.delta_base, (bytes, int)) 

353 return (self.delta_base, self.decomp_chunks) 

354 else: 

355 return self.decomp_chunks 

356 

357 def __eq__(self, other: object) -> bool: 

358 """Check equality with another UnpackedObject.""" 

359 if not isinstance(other, UnpackedObject): 

360 return False 

361 for slot in self.__slots__: 

362 if getattr(self, slot) != getattr(other, slot): 

363 return False 

364 return True 

365 

366 def __ne__(self, other: object) -> bool: 

367 """Check inequality with another UnpackedObject.""" 

368 return not (self == other) 

369 

370 def __repr__(self) -> str: 

371 """Return string representation of this UnpackedObject.""" 

372 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

373 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

374 

375 

376_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

377 

378 

379def read_zlib_chunks( 

380 read_some: Callable[[int], bytes], 

381 unpacked: UnpackedObject, 

382 include_comp: bool = False, 

383 buffer_size: int = _ZLIB_BUFSIZE, 

384) -> bytes: 

385 """Read zlib data from a buffer. 

386 

387 This function requires that the buffer have additional data following the 

388 compressed data, which is guaranteed to be the case for git pack files. 

389 

390 Args: 

391 read_some: Read function that returns at least one byte, but may 

392 return less than the requested size. 

393 unpacked: An UnpackedObject to write result data to. If its crc32 

394 attr is not None, the CRC32 of the compressed bytes will be computed 

395 using this starting CRC32. 

396 After this function, will have the following attrs set: 

397 * comp_chunks (if include_comp is True) 

398 * decomp_chunks 

399 * decomp_len 

400 * crc32 

401 include_comp: If True, include compressed data in the result. 

402 buffer_size: Size of the read buffer. 

403 Returns: Leftover unused data from the decompression. 

404 

405 Raises: 

406 zlib.error: if a decompression error occurred. 

407 """ 

408 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

409 raise ValueError("non-negative zlib data stream size expected") 

410 decomp_obj = zlib.decompressobj() 

411 

412 comp_chunks = [] 

413 decomp_chunks = unpacked.decomp_chunks 

414 decomp_len = 0 

415 crc32 = unpacked.crc32 

416 

417 while True: 

418 add = read_some(buffer_size) 

419 if not add: 

420 raise zlib.error("EOF before end of zlib stream") 

421 comp_chunks.append(add) 

422 decomp = decomp_obj.decompress(add) 

423 decomp_len += len(decomp) 

424 decomp_chunks.append(decomp) 

425 unused = decomp_obj.unused_data 

426 if unused: 

427 left = len(unused) 

428 if crc32 is not None: 

429 crc32 = binascii.crc32(add[:-left], crc32) 

430 if include_comp: 

431 comp_chunks[-1] = add[:-left] 

432 break 

433 elif crc32 is not None: 

434 crc32 = binascii.crc32(add, crc32) 

435 if crc32 is not None: 

436 crc32 &= 0xFFFFFFFF 

437 

438 if decomp_len != unpacked.decomp_len: 

439 raise zlib.error("decompressed data does not match expected size") 

440 

441 unpacked.crc32 = crc32 

442 if include_comp: 

443 unpacked.comp_chunks = comp_chunks 

444 return unused 

445 

446 

447def iter_sha1(iter: Iterable[bytes]) -> bytes: 

448 """Return the hexdigest of the SHA1 over a set of names. 

449 

450 Args: 

451 iter: Iterator over string objects 

452 Returns: 40-byte hex sha1 digest 

453 """ 

454 sha = sha1() 

455 for name in iter: 

456 sha.update(name) 

457 return sha.hexdigest().encode("ascii") 

458 

459 

460def load_pack_index(path: str | os.PathLike[str]) -> "PackIndex": 

461 """Load an index file by path. 

462 

463 Args: 

464 path: Path to the index file 

465 Returns: A PackIndex loaded from the given path 

466 """ 

467 with GitFile(path, "rb") as f: 

468 return load_pack_index_file(path, f) 

469 

470 

471def _load_file_contents( 

472 f: IO[bytes] | _GitFile, size: int | None = None 

473) -> tuple[bytes | Any, int]: 

474 """Load contents from a file, preferring mmap when possible. 

475 

476 Args: 

477 f: File-like object to load 

478 size: Expected size, or None to determine from file 

479 Returns: Tuple of (contents, size) 

480 """ 

481 try: 

482 fd = f.fileno() 

483 except (UnsupportedOperation, AttributeError): 

484 fd = None 

485 # Attempt to use mmap if possible 

486 if fd is not None: 

487 if size is None: 

488 size = os.fstat(fd).st_size 

489 if has_mmap: 

490 try: 

491 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

492 except (OSError, ValueError): 

493 # Can't mmap - perhaps a socket or invalid file descriptor 

494 pass 

495 else: 

496 return contents, size 

497 contents_bytes = f.read() 

498 size = len(contents_bytes) 

499 return contents_bytes, size 

500 

501 

502def load_pack_index_file( 

503 path: str | os.PathLike[str], f: IO[bytes] | _GitFile 

504) -> "PackIndex": 

505 """Load an index file from a file-like object. 

506 

507 Args: 

508 path: Path for the index file 

509 f: File-like object 

510 Returns: A PackIndex loaded from the given file 

511 """ 

512 contents, size = _load_file_contents(f) 

513 if contents[:4] == b"\377tOc": 

514 version = struct.unpack(b">L", contents[4:8])[0] 

515 if version == 2: 

516 return PackIndex2(path, file=f, contents=contents, size=size) 

517 elif version == 3: 

518 return PackIndex3(path, file=f, contents=contents, size=size) 

519 else: 

520 raise KeyError(f"Unknown pack index format {version}") 

521 else: 

522 return PackIndex1(path, file=f, contents=contents, size=size) 

523 

524 

525def bisect_find_sha( 

526 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

527) -> int | None: 

528 """Find a SHA in a data blob with sorted SHAs. 

529 

530 Args: 

531 start: Start index of range to search 

532 end: End index of range to search 

533 sha: Sha to find 

534 unpack_name: Callback to retrieve SHA by index 

535 Returns: Index of the SHA, or None if it wasn't found 

536 """ 

537 assert start <= end 

538 while start <= end: 

539 i = (start + end) // 2 

540 file_sha = unpack_name(i) 

541 if file_sha < sha: 

542 start = i + 1 

543 elif file_sha > sha: 

544 end = i - 1 

545 else: 

546 return i 

547 return None 

548 

549 

550PackIndexEntry = tuple[bytes, int, int | None] 

551 

552 

553class PackIndex: 

554 """An index in to a packfile. 

555 

556 Given a sha id of an object a pack index can tell you the location in the 

557 packfile of that object if it has it. 

558 """ 

559 

560 # Default to SHA-1 for backward compatibility 

561 hash_algorithm = 1 

562 hash_size = 20 

563 

564 def __eq__(self, other: object) -> bool: 

565 """Check equality with another PackIndex.""" 

566 if not isinstance(other, PackIndex): 

567 return False 

568 

569 for (name1, _, _), (name2, _, _) in zip( 

570 self.iterentries(), other.iterentries() 

571 ): 

572 if name1 != name2: 

573 return False 

574 return True 

575 

576 def __ne__(self, other: object) -> bool: 

577 """Check if this pack index is not equal to another.""" 

578 return not self.__eq__(other) 

579 

580 def __len__(self) -> int: 

581 """Return the number of entries in this pack index.""" 

582 raise NotImplementedError(self.__len__) 

583 

584 def __iter__(self) -> Iterator[bytes]: 

585 """Iterate over the SHAs in this pack.""" 

586 return map(sha_to_hex, self._itersha()) 

587 

588 def iterentries(self) -> Iterator[PackIndexEntry]: 

589 """Iterate over the entries in this pack index. 

590 

591 Returns: iterator over tuples with object name, offset in packfile and 

592 crc32 checksum. 

593 """ 

594 raise NotImplementedError(self.iterentries) 

595 

596 def get_pack_checksum(self) -> bytes | None: 

597 """Return the SHA1 checksum stored for the corresponding packfile. 

598 

599 Returns: 20-byte binary digest, or None if not available 

600 """ 

601 raise NotImplementedError(self.get_pack_checksum) 

602 

603 @replace_me(since="0.21.0", remove_in="0.23.0") 

604 def object_index(self, sha: bytes) -> int: 

605 """Return the index for the given SHA. 

606 

607 Args: 

608 sha: SHA-1 hash 

609 

610 Returns: 

611 Index position 

612 """ 

613 return self.object_offset(sha) 

614 

615 def object_offset(self, sha: bytes) -> int: 

616 """Return the offset in to the corresponding packfile for the object. 

617 

618 Given the name of an object it will return the offset that object 

619 lives at within the corresponding pack file. If the pack file doesn't 

620 have the object then None will be returned. 

621 """ 

622 raise NotImplementedError(self.object_offset) 

623 

624 def object_sha1(self, index: int) -> bytes: 

625 """Return the SHA1 corresponding to the index in the pack file.""" 

626 for name, offset, _crc32 in self.iterentries(): 

627 if offset == index: 

628 return name 

629 else: 

630 raise KeyError(index) 

631 

632 def _object_offset(self, sha: bytes) -> int: 

633 """See object_offset. 

634 

635 Args: 

636 sha: A *binary* SHA string. (20 characters long)_ 

637 """ 

638 raise NotImplementedError(self._object_offset) 

639 

640 def objects_sha1(self) -> bytes: 

641 """Return the hex SHA1 over all the shas of all objects in this pack. 

642 

643 Note: This is used for the filename of the pack. 

644 """ 

645 return iter_sha1(self._itersha()) 

646 

647 def _itersha(self) -> Iterator[bytes]: 

648 """Yield all the SHA1's of the objects in the index, sorted.""" 

649 raise NotImplementedError(self._itersha) 

650 

651 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

652 """Iterate over all SHA1s with the given prefix. 

653 

654 Args: 

655 prefix: Binary prefix to match 

656 Returns: Iterator of matching SHA1s 

657 """ 

658 # Default implementation for PackIndex classes that don't override 

659 for sha, _, _ in self.iterentries(): 

660 if sha.startswith(prefix): 

661 yield sha 

662 

663 def close(self) -> None: 

664 """Close any open files.""" 

665 

666 def check(self) -> None: 

667 """Check the consistency of this pack index.""" 

668 

669 

670class MemoryPackIndex(PackIndex): 

671 """Pack index that is stored entirely in memory.""" 

672 

673 def __init__( 

674 self, 

675 entries: list[tuple[bytes, int, int | None]], 

676 pack_checksum: bytes | None = None, 

677 ) -> None: 

678 """Create a new MemoryPackIndex. 

679 

680 Args: 

681 entries: Sequence of name, idx, crc32 (sorted) 

682 pack_checksum: Optional pack checksum 

683 """ 

684 self._by_sha = {} 

685 self._by_offset = {} 

686 for name, offset, _crc32 in entries: 

687 self._by_sha[name] = offset 

688 self._by_offset[offset] = name 

689 self._entries = entries 

690 self._pack_checksum = pack_checksum 

691 

692 def get_pack_checksum(self) -> bytes | None: 

693 """Return the SHA checksum stored for the corresponding packfile.""" 

694 return self._pack_checksum 

695 

696 def __len__(self) -> int: 

697 """Return the number of entries in this pack index.""" 

698 return len(self._entries) 

699 

700 def object_offset(self, sha: bytes) -> int: 

701 """Return the offset for the given SHA. 

702 

703 Args: 

704 sha: SHA to look up (binary or hex) 

705 Returns: Offset in the pack file 

706 """ 

707 if len(sha) == 40: 

708 sha = hex_to_sha(sha) 

709 return self._by_sha[sha] 

710 

711 def object_sha1(self, offset: int) -> bytes: 

712 """Return the SHA1 for the object at the given offset.""" 

713 return self._by_offset[offset] 

714 

715 def _itersha(self) -> Iterator[bytes]: 

716 """Iterate over all SHA1s in the index.""" 

717 return iter(self._by_sha) 

718 

719 def iterentries(self) -> Iterator[PackIndexEntry]: 

720 """Iterate over all index entries.""" 

721 return iter(self._entries) 

722 

723 @classmethod 

724 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

725 """Create a MemoryPackIndex from a PackData object.""" 

726 return MemoryPackIndex( 

727 list(pack_data.sorted_entries()), pack_data.get_stored_checksum() 

728 ) 

729 

730 @classmethod 

731 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

732 """Create a copy of another PackIndex in memory.""" 

733 return cls(list(other_index.iterentries()), other_index.get_pack_checksum()) 

734 

735 

736class FilePackIndex(PackIndex): 

737 """Pack index that is based on a file. 

738 

739 To do the loop it opens the file, and indexes first 256 4 byte groups 

740 with the first byte of the sha id. The value in the four byte group indexed 

741 is the end of the group that shares the same starting byte. Subtract one 

742 from the starting byte and index again to find the start of the group. 

743 The values are sorted by sha id within the group, so do the math to find 

744 the start and end offset and then bisect in to find if the value is 

745 present. 

746 """ 

747 

748 _fan_out_table: list[int] 

749 _file: IO[bytes] | _GitFile 

750 

751 def __init__( 

752 self, 

753 filename: str | os.PathLike[str], 

754 file: IO[bytes] | _GitFile | None = None, 

755 contents: "bytes | mmap.mmap | None" = None, 

756 size: int | None = None, 

757 ) -> None: 

758 """Create a pack index object. 

759 

760 Provide it with the name of the index file to consider, and it will map 

761 it whenever required. 

762 """ 

763 self._filename = filename 

764 # Take the size now, so it can be checked each time we map the file to 

765 # ensure that it hasn't changed. 

766 if file is None: 

767 self._file = GitFile(filename, "rb") 

768 else: 

769 self._file = file 

770 if contents is None: 

771 self._contents, self._size = _load_file_contents(self._file, size) 

772 else: 

773 self._contents = contents 

774 self._size = size if size is not None else len(contents) 

775 

776 @property 

777 def path(self) -> str: 

778 """Return the path to this index file.""" 

779 return os.fspath(self._filename) 

780 

781 def __eq__(self, other: object) -> bool: 

782 """Check equality with another FilePackIndex.""" 

783 # Quick optimization: 

784 if ( 

785 isinstance(other, FilePackIndex) 

786 and self._fan_out_table != other._fan_out_table 

787 ): 

788 return False 

789 

790 return super().__eq__(other) 

791 

792 def close(self) -> None: 

793 """Close the underlying file and any mmap.""" 

794 self._file.close() 

795 close_fn = getattr(self._contents, "close", None) 

796 if close_fn is not None: 

797 close_fn() 

798 

799 def __len__(self) -> int: 

800 """Return the number of entries in this pack index.""" 

801 return self._fan_out_table[-1] 

802 

803 def _unpack_entry(self, i: int) -> PackIndexEntry: 

804 """Unpack the i-th entry in the index file. 

805 

806 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

807 checksum (if known). 

808 """ 

809 raise NotImplementedError(self._unpack_entry) 

810 

811 def _unpack_name(self, i: int) -> bytes: 

812 """Unpack the i-th name from the index file.""" 

813 raise NotImplementedError(self._unpack_name) 

814 

815 def _unpack_offset(self, i: int) -> int: 

816 """Unpack the i-th object offset from the index file.""" 

817 raise NotImplementedError(self._unpack_offset) 

818 

819 def _unpack_crc32_checksum(self, i: int) -> int | None: 

820 """Unpack the crc32 checksum for the ith object from the index file.""" 

821 raise NotImplementedError(self._unpack_crc32_checksum) 

822 

823 def _itersha(self) -> Iterator[bytes]: 

824 """Iterate over all SHA1s in the index.""" 

825 for i in range(len(self)): 

826 yield self._unpack_name(i) 

827 

828 def iterentries(self) -> Iterator[PackIndexEntry]: 

829 """Iterate over the entries in this pack index. 

830 

831 Returns: iterator over tuples with object name, offset in packfile and 

832 crc32 checksum. 

833 """ 

834 for i in range(len(self)): 

835 yield self._unpack_entry(i) 

836 

837 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

838 """Read the fan-out table from the index. 

839 

840 The fan-out table contains 256 entries mapping first byte values 

841 to the number of objects with SHA1s less than or equal to that byte. 

842 

843 Args: 

844 start_offset: Offset in the file where the fan-out table starts 

845 Returns: List of 256 integers 

846 """ 

847 ret = [] 

848 for i in range(0x100): 

849 fanout_entry = self._contents[ 

850 start_offset + i * 4 : start_offset + (i + 1) * 4 

851 ] 

852 ret.append(struct.unpack(">L", fanout_entry)[0]) 

853 return ret 

854 

855 def check(self) -> None: 

856 """Check that the stored checksum matches the actual checksum.""" 

857 actual = self.calculate_checksum() 

858 stored = self.get_stored_checksum() 

859 if actual != stored: 

860 raise ChecksumMismatch(stored, actual) 

861 

862 def calculate_checksum(self) -> bytes: 

863 """Calculate the SHA1 checksum over this pack index. 

864 

865 Returns: This is a 20-byte binary digest 

866 """ 

867 return sha1(self._contents[:-20]).digest() 

868 

869 def get_pack_checksum(self) -> bytes: 

870 """Return the SHA1 checksum stored for the corresponding packfile. 

871 

872 Returns: 20-byte binary digest 

873 """ 

874 return bytes(self._contents[-40:-20]) 

875 

876 def get_stored_checksum(self) -> bytes: 

877 """Return the SHA1 checksum stored for this index. 

878 

879 Returns: 20-byte binary digest 

880 """ 

881 return bytes(self._contents[-20:]) 

882 

883 def object_offset(self, sha: bytes) -> int: 

884 """Return the offset in to the corresponding packfile for the object. 

885 

886 Given the name of an object it will return the offset that object 

887 lives at within the corresponding pack file. If the pack file doesn't 

888 have the object then None will be returned. 

889 """ 

890 if len(sha) == 40: 

891 sha = hex_to_sha(sha) 

892 try: 

893 return self._object_offset(sha) 

894 except ValueError as exc: 

895 closed = getattr(self._contents, "closed", None) 

896 if closed in (None, True): 

897 raise PackFileDisappeared(self) from exc 

898 raise 

899 

900 def _object_offset(self, sha: bytes) -> int: 

901 """See object_offset. 

902 

903 Args: 

904 sha: A *binary* SHA string. (20 characters long)_ 

905 """ 

906 assert len(sha) == 20 

907 idx = ord(sha[:1]) 

908 if idx == 0: 

909 start = 0 

910 else: 

911 start = self._fan_out_table[idx - 1] 

912 end = self._fan_out_table[idx] 

913 i = bisect_find_sha(start, end, sha, self._unpack_name) 

914 if i is None: 

915 raise KeyError(sha) 

916 return self._unpack_offset(i) 

917 

918 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

919 """Iterate over all SHA1s with the given prefix.""" 

920 start = ord(prefix[:1]) 

921 if start == 0: 

922 start = 0 

923 else: 

924 start = self._fan_out_table[start - 1] 

925 end = ord(prefix[:1]) + 1 

926 if end == 0x100: 

927 end = len(self) 

928 else: 

929 end = self._fan_out_table[end] 

930 assert start <= end 

931 started = False 

932 for i in range(start, end): 

933 name: bytes = self._unpack_name(i) 

934 if name.startswith(prefix): 

935 yield name 

936 started = True 

937 elif started: 

938 break 

939 

940 

941class PackIndex1(FilePackIndex): 

942 """Version 1 Pack Index file.""" 

943 

944 def __init__( 

945 self, 

946 filename: str | os.PathLike[str], 

947 file: IO[bytes] | _GitFile | None = None, 

948 contents: bytes | None = None, 

949 size: int | None = None, 

950 ) -> None: 

951 """Initialize a version 1 pack index. 

952 

953 Args: 

954 filename: Path to the index file 

955 file: Optional file object 

956 contents: Optional mmap'd contents 

957 size: Optional size of the index 

958 """ 

959 super().__init__(filename, file, contents, size) 

960 self.version = 1 

961 self._fan_out_table = self._read_fan_out_table(0) 

962 

963 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]: 

964 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24)) 

965 return (name, offset, None) 

966 

967 def _unpack_name(self, i: int) -> bytes: 

968 offset = (0x100 * 4) + (i * 24) + 4 

969 return self._contents[offset : offset + 20] 

970 

971 def _unpack_offset(self, i: int) -> int: 

972 offset = (0x100 * 4) + (i * 24) 

973 result = unpack_from(">L", self._contents, offset)[0] 

974 assert isinstance(result, int) 

975 return result 

976 

977 def _unpack_crc32_checksum(self, i: int) -> None: 

978 # Not stored in v1 index files 

979 return None 

980 

981 

982class PackIndex2(FilePackIndex): 

983 """Version 2 Pack Index file.""" 

984 

985 def __init__( 

986 self, 

987 filename: str | os.PathLike[str], 

988 file: IO[bytes] | _GitFile | None = None, 

989 contents: bytes | None = None, 

990 size: int | None = None, 

991 ) -> None: 

992 """Initialize a version 2 pack index. 

993 

994 Args: 

995 filename: Path to the index file 

996 file: Optional file object 

997 contents: Optional mmap'd contents 

998 size: Optional size of the index 

999 """ 

1000 super().__init__(filename, file, contents, size) 

1001 if self._contents[:4] != b"\377tOc": 

1002 raise AssertionError("Not a v2 pack index file") 

1003 (self.version,) = unpack_from(b">L", self._contents, 4) 

1004 if self.version != 2: 

1005 raise AssertionError(f"Version was {self.version}") 

1006 self._fan_out_table = self._read_fan_out_table(8) 

1007 self._name_table_offset = 8 + 0x100 * 4 

1008 self._crc32_table_offset = self._name_table_offset + 20 * len(self) 

1009 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1010 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1011 self 

1012 ) 

1013 

1014 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1015 return ( 

1016 self._unpack_name(i), 

1017 self._unpack_offset(i), 

1018 self._unpack_crc32_checksum(i), 

1019 ) 

1020 

1021 def _unpack_name(self, i: int) -> bytes: 

1022 offset = self._name_table_offset + i * 20 

1023 return self._contents[offset : offset + 20] 

1024 

1025 def _unpack_offset(self, i: int) -> int: 

1026 offset_pos = self._pack_offset_table_offset + i * 4 

1027 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1028 assert isinstance(offset, int) 

1029 if offset & (2**31): 

1030 large_offset_pos = ( 

1031 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1032 ) 

1033 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1034 assert isinstance(offset, int) 

1035 return offset 

1036 

1037 def _unpack_crc32_checksum(self, i: int) -> int: 

1038 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1039 assert isinstance(result, int) 

1040 return result 

1041 

1042 

1043class PackIndex3(FilePackIndex): 

1044 """Version 3 Pack Index file. 

1045 

1046 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1047 """ 

1048 

1049 def __init__( 

1050 self, 

1051 filename: str | os.PathLike[str], 

1052 file: IO[bytes] | _GitFile | None = None, 

1053 contents: bytes | None = None, 

1054 size: int | None = None, 

1055 ) -> None: 

1056 """Initialize a version 3 pack index. 

1057 

1058 Args: 

1059 filename: Path to the index file 

1060 file: Optional file object 

1061 contents: Optional mmap'd contents 

1062 size: Optional size of the index 

1063 """ 

1064 super().__init__(filename, file, contents, size) 

1065 if self._contents[:4] != b"\377tOc": 

1066 raise AssertionError("Not a v3 pack index file") 

1067 (self.version,) = unpack_from(b">L", self._contents, 4) 

1068 if self.version != 3: 

1069 raise AssertionError(f"Version was {self.version}") 

1070 

1071 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1072 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8) 

1073 if self.hash_algorithm == 1: 

1074 self.hash_size = 20 # SHA-1 

1075 elif self.hash_algorithm == 2: 

1076 self.hash_size = 32 # SHA-256 

1077 else: 

1078 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}") 

1079 

1080 # Read length of shortened object names 

1081 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1082 

1083 # Calculate offsets based on variable hash size 

1084 self._fan_out_table = self._read_fan_out_table( 

1085 16 

1086 ) # After header (4 + 4 + 4 + 4) 

1087 self._name_table_offset = 16 + 0x100 * 4 

1088 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1089 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1090 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1091 self 

1092 ) 

1093 

1094 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1095 return ( 

1096 self._unpack_name(i), 

1097 self._unpack_offset(i), 

1098 self._unpack_crc32_checksum(i), 

1099 ) 

1100 

1101 def _unpack_name(self, i: int) -> bytes: 

1102 offset = self._name_table_offset + i * self.hash_size 

1103 return self._contents[offset : offset + self.hash_size] 

1104 

1105 def _unpack_offset(self, i: int) -> int: 

1106 offset_pos = self._pack_offset_table_offset + i * 4 

1107 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1108 assert isinstance(offset, int) 

1109 if offset & (2**31): 

1110 large_offset_pos = ( 

1111 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1112 ) 

1113 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1114 assert isinstance(offset, int) 

1115 return offset 

1116 

1117 def _unpack_crc32_checksum(self, i: int) -> int: 

1118 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1119 assert isinstance(result, int) 

1120 return result 

1121 

1122 

1123def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1124 """Read the header of a pack file. 

1125 

1126 Args: 

1127 read: Read function 

1128 Returns: Tuple of (pack version, number of objects). If no data is 

1129 available to read, returns (None, None). 

1130 """ 

1131 header = read(12) 

1132 if not header: 

1133 raise AssertionError("file too short to contain pack") 

1134 if header[:4] != b"PACK": 

1135 raise AssertionError(f"Invalid pack header {header!r}") 

1136 (version,) = unpack_from(b">L", header, 4) 

1137 if version not in (2, 3): 

1138 raise AssertionError(f"Version was {version}") 

1139 (num_objects,) = unpack_from(b">L", header, 8) 

1140 return (version, num_objects) 

1141 

1142 

1143def chunks_length(chunks: bytes | Iterable[bytes]) -> int: 

1144 """Get the total length of a sequence of chunks. 

1145 

1146 Args: 

1147 chunks: Either a single bytes object or an iterable of bytes 

1148 Returns: Total length in bytes 

1149 """ 

1150 if isinstance(chunks, bytes): 

1151 return len(chunks) 

1152 else: 

1153 return sum(map(len, chunks)) 

1154 

1155 

1156def unpack_object( 

1157 read_all: Callable[[int], bytes], 

1158 read_some: Callable[[int], bytes] | None = None, 

1159 compute_crc32: bool = False, 

1160 include_comp: bool = False, 

1161 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1162) -> tuple[UnpackedObject, bytes]: 

1163 """Unpack a Git object. 

1164 

1165 Args: 

1166 read_all: Read function that blocks until the number of requested 

1167 bytes are read. 

1168 read_some: Read function that returns at least one byte, but may not 

1169 return the number of bytes requested. 

1170 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1171 False, the returned CRC32 will be None. 

1172 include_comp: If True, include compressed data in the result. 

1173 zlib_bufsize: An optional buffer size for zlib operations. 

1174 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1175 leftover from decompression, and unpacked in an UnpackedObject with 

1176 the following attrs set: 

1177 

1178 * obj_chunks (for non-delta types) 

1179 * pack_type_num 

1180 * delta_base (for delta types) 

1181 * comp_chunks (if include_comp is True) 

1182 * decomp_chunks 

1183 * decomp_len 

1184 * crc32 (if compute_crc32 is True) 

1185 """ 

1186 if read_some is None: 

1187 read_some = read_all 

1188 if compute_crc32: 

1189 crc32 = 0 

1190 else: 

1191 crc32 = None 

1192 

1193 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1194 type_num = (raw[0] >> 4) & 0x07 

1195 size = raw[0] & 0x0F 

1196 for i, byte in enumerate(raw[1:]): 

1197 size += (byte & 0x7F) << ((i * 7) + 4) 

1198 

1199 delta_base: int | bytes | None 

1200 raw_base = len(raw) 

1201 if type_num == OFS_DELTA: 

1202 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1203 raw_base += len(raw) 

1204 if raw[-1] & 0x80: 

1205 raise AssertionError 

1206 delta_base_offset = raw[0] & 0x7F 

1207 for byte in raw[1:]: 

1208 delta_base_offset += 1 

1209 delta_base_offset <<= 7 

1210 delta_base_offset += byte & 0x7F 

1211 delta_base = delta_base_offset 

1212 elif type_num == REF_DELTA: 

1213 delta_base_obj = read_all(20) 

1214 if crc32 is not None: 

1215 crc32 = binascii.crc32(delta_base_obj, crc32) 

1216 delta_base = delta_base_obj 

1217 raw_base += 20 

1218 else: 

1219 delta_base = None 

1220 

1221 unpacked = UnpackedObject( 

1222 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32 

1223 ) 

1224 unused = read_zlib_chunks( 

1225 read_some, 

1226 unpacked, 

1227 buffer_size=zlib_bufsize, 

1228 include_comp=include_comp, 

1229 ) 

1230 return unpacked, unused 

1231 

1232 

1233def _compute_object_size(value: tuple[int, Any]) -> int: 

1234 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1235 (num, obj) = value 

1236 if num in DELTA_TYPES: 

1237 return chunks_length(obj[1]) 

1238 return chunks_length(obj) 

1239 

1240 

1241class PackStreamReader: 

1242 """Class to read a pack stream. 

1243 

1244 The pack is read from a ReceivableProtocol using read() or recv() as 

1245 appropriate. 

1246 """ 

1247 

1248 def __init__( 

1249 self, 

1250 read_all: Callable[[int], bytes], 

1251 read_some: Callable[[int], bytes] | None = None, 

1252 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1253 ) -> None: 

1254 """Initialize pack stream reader. 

1255 

1256 Args: 

1257 read_all: Function to read all requested bytes 

1258 read_some: Function to read some bytes (optional) 

1259 zlib_bufsize: Buffer size for zlib decompression 

1260 """ 

1261 self.read_all = read_all 

1262 if read_some is None: 

1263 self.read_some = read_all 

1264 else: 

1265 self.read_some = read_some 

1266 self.sha = sha1() 

1267 self._offset = 0 

1268 self._rbuf = BytesIO() 

1269 # trailer is a deque to avoid memory allocation on small reads 

1270 self._trailer: deque[int] = deque() 

1271 self._zlib_bufsize = zlib_bufsize 

1272 

1273 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1274 """Read up to size bytes using the given callback. 

1275 

1276 As a side effect, update the verifier's hash (excluding the last 20 

1277 bytes read). 

1278 

1279 Args: 

1280 read: The read callback to read from. 

1281 size: The maximum number of bytes to read; the particular 

1282 behavior is callback-specific. 

1283 Returns: Bytes read 

1284 """ 

1285 data = read(size) 

1286 

1287 # maintain a trailer of the last 20 bytes we've read 

1288 n = len(data) 

1289 self._offset += n 

1290 tn = len(self._trailer) 

1291 if n >= 20: 

1292 to_pop = tn 

1293 to_add = 20 

1294 else: 

1295 to_pop = max(n + tn - 20, 0) 

1296 to_add = n 

1297 self.sha.update( 

1298 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1299 ) 

1300 self._trailer.extend(data[-to_add:]) 

1301 

1302 # hash everything but the trailer 

1303 self.sha.update(data[:-to_add]) 

1304 return data 

1305 

1306 def _buf_len(self) -> int: 

1307 buf = self._rbuf 

1308 start = buf.tell() 

1309 buf.seek(0, SEEK_END) 

1310 end = buf.tell() 

1311 buf.seek(start) 

1312 return end - start 

1313 

1314 @property 

1315 def offset(self) -> int: 

1316 """Return current offset in the stream.""" 

1317 return self._offset - self._buf_len() 

1318 

1319 def read(self, size: int) -> bytes: 

1320 """Read, blocking until size bytes are read.""" 

1321 buf_len = self._buf_len() 

1322 if buf_len >= size: 

1323 return self._rbuf.read(size) 

1324 buf_data = self._rbuf.read() 

1325 self._rbuf = BytesIO() 

1326 return buf_data + self._read(self.read_all, size - buf_len) 

1327 

1328 def recv(self, size: int) -> bytes: 

1329 """Read up to size bytes, blocking until one byte is read.""" 

1330 buf_len = self._buf_len() 

1331 if buf_len: 

1332 data = self._rbuf.read(size) 

1333 if size >= buf_len: 

1334 self._rbuf = BytesIO() 

1335 return data 

1336 return self._read(self.read_some, size) 

1337 

1338 def __len__(self) -> int: 

1339 """Return the number of objects in this pack.""" 

1340 return self._num_objects 

1341 

1342 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1343 """Read the objects in this pack file. 

1344 

1345 Args: 

1346 compute_crc32: If True, compute the CRC32 of the compressed 

1347 data. If False, the returned CRC32 will be None. 

1348 Returns: Iterator over UnpackedObjects with the following members set: 

1349 offset 

1350 obj_type_num 

1351 obj_chunks (for non-delta types) 

1352 delta_base (for delta types) 

1353 decomp_chunks 

1354 decomp_len 

1355 crc32 (if compute_crc32 is True) 

1356 

1357 Raises: 

1358 ChecksumMismatch: if the checksum of the pack contents does not 

1359 match the checksum in the pack trailer. 

1360 zlib.error: if an error occurred during zlib decompression. 

1361 IOError: if an error occurred writing to the output file. 

1362 """ 

1363 _pack_version, self._num_objects = read_pack_header(self.read) 

1364 

1365 for _ in range(self._num_objects): 

1366 offset = self.offset 

1367 unpacked, unused = unpack_object( 

1368 self.read, 

1369 read_some=self.recv, 

1370 compute_crc32=compute_crc32, 

1371 zlib_bufsize=self._zlib_bufsize, 

1372 ) 

1373 unpacked.offset = offset 

1374 

1375 # prepend any unused data to current read buffer 

1376 buf = BytesIO() 

1377 buf.write(unused) 

1378 buf.write(self._rbuf.read()) 

1379 buf.seek(0) 

1380 self._rbuf = buf 

1381 

1382 yield unpacked 

1383 

1384 if self._buf_len() < 20: 

1385 # If the read buffer is full, then the last read() got the whole 

1386 # trailer off the wire. If not, it means there is still some of the 

1387 # trailer to read. We need to read() all 20 bytes; N come from the 

1388 # read buffer and (20 - N) come from the wire. 

1389 self.read(20) 

1390 

1391 pack_sha = bytearray(self._trailer) 

1392 if pack_sha != self.sha.digest(): 

1393 raise ChecksumMismatch(sha_to_hex(bytes(pack_sha)), self.sha.hexdigest()) 

1394 

1395 

1396class PackStreamCopier(PackStreamReader): 

1397 """Class to verify a pack stream as it is being read. 

1398 

1399 The pack is read from a ReceivableProtocol using read() or recv() as 

1400 appropriate and written out to the given file-like object. 

1401 """ 

1402 

1403 def __init__( 

1404 self, 

1405 read_all: Callable[[int], bytes], 

1406 read_some: Callable[[int], bytes] | None, 

1407 outfile: IO[bytes], 

1408 delta_iter: "DeltaChainIterator[UnpackedObject] | None" = None, 

1409 ) -> None: 

1410 """Initialize the copier. 

1411 

1412 Args: 

1413 read_all: Read function that blocks until the number of 

1414 requested bytes are read. 

1415 read_some: Read function that returns at least one byte, but may 

1416 not return the number of bytes requested. 

1417 outfile: File-like object to write output through. 

1418 delta_iter: Optional DeltaChainIterator to record deltas as we 

1419 read them. 

1420 """ 

1421 super().__init__(read_all, read_some=read_some) 

1422 self.outfile = outfile 

1423 self._delta_iter = delta_iter 

1424 

1425 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1426 """Read data from the read callback and write it to the file.""" 

1427 data = super()._read(read, size) 

1428 self.outfile.write(data) 

1429 return data 

1430 

1431 def verify(self, progress: Callable[..., None] | None = None) -> None: 

1432 """Verify a pack stream and write it to the output file. 

1433 

1434 See PackStreamReader.iterobjects for a list of exceptions this may 

1435 throw. 

1436 """ 

1437 i = 0 # default count of entries if read_objects() is empty 

1438 for i, unpacked in enumerate(self.read_objects()): 

1439 if self._delta_iter: 

1440 self._delta_iter.record(unpacked) 

1441 if progress is not None: 

1442 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1443 if progress is not None: 

1444 progress(f"copied {i} pack entries\n".encode("ascii")) 

1445 

1446 

1447def obj_sha(type: int, chunks: bytes | Iterable[bytes]) -> bytes: 

1448 """Compute the SHA for a numeric type and object chunks.""" 

1449 sha = sha1() 

1450 sha.update(object_header(type, chunks_length(chunks))) 

1451 if isinstance(chunks, bytes): 

1452 sha.update(chunks) 

1453 else: 

1454 for chunk in chunks: 

1455 sha.update(chunk) 

1456 return sha.digest() 

1457 

1458 

1459def compute_file_sha( 

1460 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16 

1461) -> "HashObject": 

1462 """Hash a portion of a file into a new SHA. 

1463 

1464 Args: 

1465 f: A file-like object to read from that supports seek(). 

1466 start_ofs: The offset in the file to start reading at. 

1467 end_ofs: The offset in the file to end reading at, relative to the 

1468 end of the file. 

1469 buffer_size: A buffer size for reading. 

1470 Returns: A new SHA object updated with data read from the file. 

1471 """ 

1472 sha = sha1() 

1473 f.seek(0, SEEK_END) 

1474 length = f.tell() 

1475 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1476 raise AssertionError( 

1477 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1478 ) 

1479 todo = length + end_ofs - start_ofs 

1480 f.seek(start_ofs) 

1481 while todo: 

1482 data = f.read(min(todo, buffer_size)) 

1483 sha.update(data) 

1484 todo -= len(data) 

1485 return sha 

1486 

1487 

1488class PackData: 

1489 """The data contained in a packfile. 

1490 

1491 Pack files can be accessed both sequentially for exploding a pack, and 

1492 directly with the help of an index to retrieve a specific object. 

1493 

1494 The objects within are either complete or a delta against another. 

1495 

1496 The header is variable length. If the MSB of each byte is set then it 

1497 indicates that the subsequent byte is still part of the header. 

1498 For the first byte the next MS bits are the type, which tells you the type 

1499 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1500 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1501 size, i.e. the last byte of the header contains the MS bits of the size. 

1502 

1503 For the complete objects the data is stored as zlib deflated data. 

1504 The size in the header is the uncompressed object size, so to uncompress 

1505 you need to just keep feeding data to zlib until you get an object back, 

1506 or it errors on bad data. This is done here by just giving the complete 

1507 buffer from the start of the deflated object on. This is bad, but until I 

1508 get mmap sorted out it will have to do. 

1509 

1510 Currently there are no integrity checks done. Also no attempt is made to 

1511 try and detect the delta case, or a request for an object at the wrong 

1512 position. It will all just throw a zlib or KeyError. 

1513 """ 

1514 

1515 def __init__( 

1516 self, 

1517 filename: str | os.PathLike[str], 

1518 file: IO[bytes] | None = None, 

1519 size: int | None = None, 

1520 *, 

1521 delta_window_size: int | None = None, 

1522 window_memory: int | None = None, 

1523 delta_cache_size: int | None = None, 

1524 depth: int | None = None, 

1525 threads: int | None = None, 

1526 big_file_threshold: int | None = None, 

1527 ) -> None: 

1528 """Create a PackData object representing the pack in the given filename. 

1529 

1530 The file must exist and stay readable until the object is disposed of. 

1531 It must also stay the same size. It will be mapped whenever needed. 

1532 

1533 Currently there is a restriction on the size of the pack as the python 

1534 mmap implementation is flawed. 

1535 """ 

1536 self._filename = filename 

1537 self._size = size 

1538 self._header_size = 12 

1539 self.delta_window_size = delta_window_size 

1540 self.window_memory = window_memory 

1541 self.delta_cache_size = delta_cache_size 

1542 self.depth = depth 

1543 self.threads = threads 

1544 self.big_file_threshold = big_file_threshold 

1545 self._file: IO[bytes] 

1546 

1547 if file is None: 

1548 self._file = GitFile(self._filename, "rb") 

1549 else: 

1550 self._file = file 

1551 (_version, self._num_objects) = read_pack_header(self._file.read) 

1552 

1553 # Use delta_cache_size config if available, otherwise default 

1554 cache_size = delta_cache_size or (1024 * 1024 * 20) 

1555 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1556 cache_size, compute_size=_compute_object_size 

1557 ) 

1558 

1559 @property 

1560 def filename(self) -> str: 

1561 """Get the filename of the pack file. 

1562 

1563 Returns: 

1564 Base filename without directory path 

1565 """ 

1566 return os.path.basename(self._filename) 

1567 

1568 @property 

1569 def path(self) -> str | os.PathLike[str]: 

1570 """Get the full path of the pack file. 

1571 

1572 Returns: 

1573 Full path to the pack file 

1574 """ 

1575 return self._filename 

1576 

1577 @classmethod 

1578 def from_file(cls, file: IO[bytes], size: int | None = None) -> "PackData": 

1579 """Create a PackData object from an open file. 

1580 

1581 Args: 

1582 file: Open file object 

1583 size: Optional file size 

1584 

1585 Returns: 

1586 PackData instance 

1587 """ 

1588 return cls(str(file), file=file, size=size) 

1589 

1590 @classmethod 

1591 def from_path(cls, path: str | os.PathLike[str]) -> "PackData": 

1592 """Create a PackData object from a file path. 

1593 

1594 Args: 

1595 path: Path to the pack file 

1596 

1597 Returns: 

1598 PackData instance 

1599 """ 

1600 return cls(filename=path) 

1601 

1602 def close(self) -> None: 

1603 """Close the underlying pack file.""" 

1604 self._file.close() 

1605 

1606 def __enter__(self) -> "PackData": 

1607 """Enter context manager.""" 

1608 return self 

1609 

1610 def __exit__( 

1611 self, 

1612 exc_type: type | None, 

1613 exc_val: BaseException | None, 

1614 exc_tb: TracebackType | None, 

1615 ) -> None: 

1616 """Exit context manager.""" 

1617 self.close() 

1618 

1619 def __eq__(self, other: object) -> bool: 

1620 """Check equality with another object.""" 

1621 if isinstance(other, PackData): 

1622 return self.get_stored_checksum() == other.get_stored_checksum() 

1623 return False 

1624 

1625 def _get_size(self) -> int: 

1626 if self._size is not None: 

1627 return self._size 

1628 self._size = os.path.getsize(self._filename) 

1629 if self._size < self._header_size: 

1630 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1631 raise AssertionError(errmsg) 

1632 return self._size 

1633 

1634 def __len__(self) -> int: 

1635 """Returns the number of objects in this pack.""" 

1636 return self._num_objects 

1637 

1638 def calculate_checksum(self) -> bytes: 

1639 """Calculate the checksum for this pack. 

1640 

1641 Returns: 20-byte binary SHA1 digest 

1642 """ 

1643 return compute_file_sha(self._file, end_ofs=-20).digest() 

1644 

1645 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1646 """Iterate over unpacked objects in the pack.""" 

1647 self._file.seek(self._header_size) 

1648 

1649 if self._num_objects is None: 

1650 return 

1651 

1652 for _ in range(self._num_objects): 

1653 offset = self._file.tell() 

1654 unpacked, unused = unpack_object( 

1655 self._file.read, compute_crc32=False, include_comp=include_comp 

1656 ) 

1657 unpacked.offset = offset 

1658 yield unpacked 

1659 # Back up over unused data. 

1660 self._file.seek(-len(unused), SEEK_CUR) 

1661 

1662 def iterentries( 

1663 self, 

1664 progress: Callable[[int, int], None] | None = None, 

1665 resolve_ext_ref: ResolveExtRefFn | None = None, 

1666 ) -> Iterator[tuple[bytes, int, int | None]]: 

1667 """Yield entries summarizing the contents of this pack. 

1668 

1669 Args: 

1670 progress: Progress function, called with current and total 

1671 object count. 

1672 resolve_ext_ref: Optional function to resolve external references 

1673 Returns: iterator of tuples with (sha, offset, crc32) 

1674 """ 

1675 num_objects = self._num_objects 

1676 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1677 for i, result in enumerate(indexer): 

1678 if progress is not None: 

1679 progress(i, num_objects) 

1680 yield result 

1681 

1682 def sorted_entries( 

1683 self, 

1684 progress: ProgressFn | None = None, 

1685 resolve_ext_ref: ResolveExtRefFn | None = None, 

1686 ) -> list[tuple[bytes, int, int]]: 

1687 """Return entries in this pack, sorted by SHA. 

1688 

1689 Args: 

1690 progress: Progress function, called with current and total 

1691 object count 

1692 resolve_ext_ref: Optional function to resolve external references 

1693 Returns: Iterator of tuples with (sha, offset, crc32) 

1694 """ 

1695 return sorted( 

1696 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore 

1697 ) 

1698 

1699 def create_index_v1( 

1700 self, 

1701 filename: str, 

1702 progress: Callable[..., None] | None = None, 

1703 resolve_ext_ref: ResolveExtRefFn | None = None, 

1704 ) -> bytes: 

1705 """Create a version 1 file for this data file. 

1706 

1707 Args: 

1708 filename: Index filename. 

1709 progress: Progress report function 

1710 resolve_ext_ref: Optional function to resolve external references 

1711 Returns: Checksum of index file 

1712 """ 

1713 entries = self.sorted_entries( 

1714 progress=progress, resolve_ext_ref=resolve_ext_ref 

1715 ) 

1716 checksum = self.calculate_checksum() 

1717 with GitFile(filename, "wb") as f: 

1718 write_pack_index_v1( 

1719 f, 

1720 entries, 

1721 checksum, 

1722 ) 

1723 return checksum 

1724 

1725 def create_index_v2( 

1726 self, 

1727 filename: str, 

1728 progress: Callable[..., None] | None = None, 

1729 resolve_ext_ref: ResolveExtRefFn | None = None, 

1730 ) -> bytes: 

1731 """Create a version 2 index file for this data file. 

1732 

1733 Args: 

1734 filename: Index filename. 

1735 progress: Progress report function 

1736 resolve_ext_ref: Optional function to resolve external references 

1737 Returns: Checksum of index file 

1738 """ 

1739 entries = self.sorted_entries( 

1740 progress=progress, resolve_ext_ref=resolve_ext_ref 

1741 ) 

1742 with GitFile(filename, "wb") as f: 

1743 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

1744 

1745 def create_index_v3( 

1746 self, 

1747 filename: str, 

1748 progress: Callable[..., None] | None = None, 

1749 resolve_ext_ref: ResolveExtRefFn | None = None, 

1750 hash_algorithm: int = 1, 

1751 ) -> bytes: 

1752 """Create a version 3 index file for this data file. 

1753 

1754 Args: 

1755 filename: Index filename. 

1756 progress: Progress report function 

1757 resolve_ext_ref: Function to resolve external references 

1758 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1759 Returns: Checksum of index file 

1760 """ 

1761 entries = self.sorted_entries( 

1762 progress=progress, resolve_ext_ref=resolve_ext_ref 

1763 ) 

1764 with GitFile(filename, "wb") as f: 

1765 return write_pack_index_v3( 

1766 f, entries, self.calculate_checksum(), hash_algorithm 

1767 ) 

1768 

1769 def create_index( 

1770 self, 

1771 filename: str, 

1772 progress: Callable[..., None] | None = None, 

1773 version: int = 2, 

1774 resolve_ext_ref: ResolveExtRefFn | None = None, 

1775 hash_algorithm: int = 1, 

1776 ) -> bytes: 

1777 """Create an index file for this data file. 

1778 

1779 Args: 

1780 filename: Index filename. 

1781 progress: Progress report function 

1782 version: Index version (1, 2, or 3) 

1783 resolve_ext_ref: Function to resolve external references 

1784 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

1785 Returns: Checksum of index file 

1786 """ 

1787 if version == 1: 

1788 return self.create_index_v1( 

1789 filename, progress, resolve_ext_ref=resolve_ext_ref 

1790 ) 

1791 elif version == 2: 

1792 return self.create_index_v2( 

1793 filename, progress, resolve_ext_ref=resolve_ext_ref 

1794 ) 

1795 elif version == 3: 

1796 return self.create_index_v3( 

1797 filename, 

1798 progress, 

1799 resolve_ext_ref=resolve_ext_ref, 

1800 hash_algorithm=hash_algorithm, 

1801 ) 

1802 else: 

1803 raise ValueError(f"unknown index format {version}") 

1804 

1805 def get_stored_checksum(self) -> bytes: 

1806 """Return the expected checksum stored in this pack.""" 

1807 self._file.seek(-20, SEEK_END) 

1808 return self._file.read(20) 

1809 

1810 def check(self) -> None: 

1811 """Check the consistency of this pack.""" 

1812 actual = self.calculate_checksum() 

1813 stored = self.get_stored_checksum() 

1814 if actual != stored: 

1815 raise ChecksumMismatch(stored, actual) 

1816 

1817 def get_unpacked_object_at( 

1818 self, offset: int, *, include_comp: bool = False 

1819 ) -> UnpackedObject: 

1820 """Given offset in the packfile return a UnpackedObject.""" 

1821 assert offset >= self._header_size 

1822 self._file.seek(offset) 

1823 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp) 

1824 unpacked.offset = offset 

1825 return unpacked 

1826 

1827 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

1828 """Given an offset in to the packfile return the object that is there. 

1829 

1830 Using the associated index the location of an object can be looked up, 

1831 and then the packfile can be asked directly for that object using this 

1832 function. 

1833 """ 

1834 try: 

1835 return self._offset_cache[offset] 

1836 except KeyError: 

1837 pass 

1838 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

1839 return (unpacked.pack_type_num, unpacked._obj()) 

1840 

1841 

1842T = TypeVar("T") 

1843 

1844 

1845class DeltaChainIterator(Generic[T]): 

1846 """Abstract iterator over pack data based on delta chains. 

1847 

1848 Each object in the pack is guaranteed to be inflated exactly once, 

1849 regardless of how many objects reference it as a delta base. As a result, 

1850 memory usage is proportional to the length of the longest delta chain. 

1851 

1852 Subclasses can override _result to define the result type of the iterator. 

1853 By default, results are UnpackedObjects with the following members set: 

1854 

1855 * offset 

1856 * obj_type_num 

1857 * obj_chunks 

1858 * pack_type_num 

1859 * delta_base (for delta types) 

1860 * comp_chunks (if _include_comp is True) 

1861 * decomp_chunks 

1862 * decomp_len 

1863 * crc32 (if _compute_crc32 is True) 

1864 """ 

1865 

1866 _compute_crc32 = False 

1867 _include_comp = False 

1868 

1869 def __init__( 

1870 self, 

1871 file_obj: IO[bytes] | None, 

1872 *, 

1873 resolve_ext_ref: ResolveExtRefFn | None = None, 

1874 ) -> None: 

1875 """Initialize DeltaChainIterator. 

1876 

1877 Args: 

1878 file_obj: File object to read pack data from 

1879 resolve_ext_ref: Optional function to resolve external references 

1880 """ 

1881 self._file = file_obj 

1882 self._resolve_ext_ref = resolve_ext_ref 

1883 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

1884 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

1885 self._full_ofs: list[tuple[int, int]] = [] 

1886 self._ext_refs: list[bytes] = [] 

1887 

1888 @classmethod 

1889 def for_pack_data( 

1890 cls, pack_data: PackData, resolve_ext_ref: ResolveExtRefFn | None = None 

1891 ) -> "DeltaChainIterator[T]": 

1892 """Create a DeltaChainIterator from pack data. 

1893 

1894 Args: 

1895 pack_data: PackData object to iterate 

1896 resolve_ext_ref: Optional function to resolve external refs 

1897 

1898 Returns: 

1899 DeltaChainIterator instance 

1900 """ 

1901 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1902 walker.set_pack_data(pack_data) 

1903 for unpacked in pack_data.iter_unpacked(include_comp=False): 

1904 walker.record(unpacked) 

1905 return walker 

1906 

1907 @classmethod 

1908 def for_pack_subset( 

1909 cls, 

1910 pack: "Pack", 

1911 shas: Iterable[bytes], 

1912 *, 

1913 allow_missing: bool = False, 

1914 resolve_ext_ref: ResolveExtRefFn | None = None, 

1915 ) -> "DeltaChainIterator[T]": 

1916 """Create a DeltaChainIterator for a subset of objects. 

1917 

1918 Args: 

1919 pack: Pack object containing the data 

1920 shas: Iterable of object SHAs to include 

1921 allow_missing: If True, skip missing objects 

1922 resolve_ext_ref: Optional function to resolve external refs 

1923 

1924 Returns: 

1925 DeltaChainIterator instance 

1926 """ 

1927 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1928 walker.set_pack_data(pack.data) 

1929 todo = set() 

1930 for sha in shas: 

1931 assert isinstance(sha, bytes) 

1932 try: 

1933 off = pack.index.object_offset(sha) 

1934 except KeyError: 

1935 if not allow_missing: 

1936 raise 

1937 else: 

1938 todo.add(off) 

1939 done = set() 

1940 while todo: 

1941 off = todo.pop() 

1942 unpacked = pack.data.get_unpacked_object_at(off) 

1943 walker.record(unpacked) 

1944 done.add(off) 

1945 base_ofs = None 

1946 if unpacked.pack_type_num == OFS_DELTA: 

1947 assert unpacked.offset is not None 

1948 assert unpacked.delta_base is not None 

1949 assert isinstance(unpacked.delta_base, int) 

1950 base_ofs = unpacked.offset - unpacked.delta_base 

1951 elif unpacked.pack_type_num == REF_DELTA: 

1952 with suppress(KeyError): 

1953 assert isinstance(unpacked.delta_base, bytes) 

1954 base_ofs = pack.index.object_index(unpacked.delta_base) 

1955 if base_ofs is not None and base_ofs not in done: 

1956 todo.add(base_ofs) 

1957 return walker 

1958 

1959 def record(self, unpacked: UnpackedObject) -> None: 

1960 """Record an unpacked object for later processing. 

1961 

1962 Args: 

1963 unpacked: UnpackedObject to record 

1964 """ 

1965 type_num = unpacked.pack_type_num 

1966 offset = unpacked.offset 

1967 assert offset is not None 

1968 if type_num == OFS_DELTA: 

1969 assert unpacked.delta_base is not None 

1970 assert isinstance(unpacked.delta_base, int) 

1971 base_offset = offset - unpacked.delta_base 

1972 self._pending_ofs[base_offset].append(offset) 

1973 elif type_num == REF_DELTA: 

1974 assert isinstance(unpacked.delta_base, bytes) 

1975 self._pending_ref[unpacked.delta_base].append(offset) 

1976 else: 

1977 self._full_ofs.append((offset, type_num)) 

1978 

1979 def set_pack_data(self, pack_data: PackData) -> None: 

1980 """Set the pack data for iteration. 

1981 

1982 Args: 

1983 pack_data: PackData object to use 

1984 """ 

1985 self._file = pack_data._file 

1986 

1987 def _walk_all_chains(self) -> Iterator[T]: 

1988 for offset, type_num in self._full_ofs: 

1989 yield from self._follow_chain(offset, type_num, None) 

1990 yield from self._walk_ref_chains() 

1991 assert not self._pending_ofs, repr(self._pending_ofs) 

1992 

1993 def _ensure_no_pending(self) -> None: 

1994 if self._pending_ref: 

1995 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref]) 

1996 

1997 def _walk_ref_chains(self) -> Iterator[T]: 

1998 if not self._resolve_ext_ref: 

1999 self._ensure_no_pending() 

2000 return 

2001 

2002 for base_sha, pending in sorted(self._pending_ref.items()): 

2003 if base_sha not in self._pending_ref: 

2004 continue 

2005 try: 

2006 type_num, chunks = self._resolve_ext_ref(base_sha) 

2007 except KeyError: 

2008 # Not an external ref, but may depend on one. Either it will 

2009 # get popped via a _follow_chain call, or we will raise an 

2010 # error below. 

2011 continue 

2012 self._ext_refs.append(base_sha) 

2013 self._pending_ref.pop(base_sha) 

2014 for new_offset in pending: 

2015 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type] 

2016 

2017 self._ensure_no_pending() 

2018 

2019 def _result(self, unpacked: UnpackedObject) -> T: 

2020 raise NotImplementedError 

2021 

2022 def _resolve_object( 

2023 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None 

2024 ) -> UnpackedObject: 

2025 assert self._file is not None 

2026 self._file.seek(offset) 

2027 unpacked, _ = unpack_object( 

2028 self._file.read, 

2029 include_comp=self._include_comp, 

2030 compute_crc32=self._compute_crc32, 

2031 ) 

2032 unpacked.offset = offset 

2033 if base_chunks is None: 

2034 assert unpacked.pack_type_num == obj_type_num 

2035 else: 

2036 assert unpacked.pack_type_num in DELTA_TYPES 

2037 unpacked.obj_type_num = obj_type_num 

2038 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2039 return unpacked 

2040 

2041 def _follow_chain( 

2042 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None 

2043 ) -> Iterator[T]: 

2044 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2045 # this approach by design inflates each object exactly once. 

2046 todo = [(offset, obj_type_num, base_chunks)] 

2047 while todo: 

2048 (offset, obj_type_num, base_chunks) = todo.pop() 

2049 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2050 yield self._result(unpacked) 

2051 

2052 assert unpacked.offset is not None 

2053 unblocked = chain( 

2054 self._pending_ofs.pop(unpacked.offset, []), 

2055 self._pending_ref.pop(unpacked.sha(), []), 

2056 ) 

2057 todo.extend( 

2058 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2059 for new_offset in unblocked 

2060 ) 

2061 

2062 def __iter__(self) -> Iterator[T]: 

2063 """Iterate over objects in the pack.""" 

2064 return self._walk_all_chains() 

2065 

2066 def ext_refs(self) -> list[bytes]: 

2067 """Return external references.""" 

2068 return self._ext_refs 

2069 

2070 

2071class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2072 """Delta chain iterator that yield unpacked objects.""" 

2073 

2074 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2075 """Return the unpacked object. 

2076 

2077 Args: 

2078 unpacked: The unpacked object 

2079 

2080 Returns: 

2081 The unpacked object unchanged 

2082 """ 

2083 return unpacked 

2084 

2085 

2086class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2087 """Delta chain iterator that yields index entries.""" 

2088 

2089 _compute_crc32 = True 

2090 

2091 def _result(self, unpacked: UnpackedObject) -> tuple[bytes, int, int | None]: 

2092 """Convert unpacked object to pack index entry. 

2093 

2094 Args: 

2095 unpacked: The unpacked object 

2096 

2097 Returns: 

2098 Tuple of (sha, offset, crc32) for index entry 

2099 """ 

2100 assert unpacked.offset is not None 

2101 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2102 

2103 

2104class PackInflater(DeltaChainIterator[ShaFile]): 

2105 """Delta chain iterator that yields ShaFile objects.""" 

2106 

2107 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2108 """Convert unpacked object to ShaFile. 

2109 

2110 Args: 

2111 unpacked: The unpacked object 

2112 

2113 Returns: 

2114 ShaFile object from the unpacked data 

2115 """ 

2116 return unpacked.sha_file() 

2117 

2118 

2119class SHA1Reader(BinaryIO): 

2120 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2121 

2122 def __init__(self, f: IO[bytes]) -> None: 

2123 """Initialize SHA1Reader. 

2124 

2125 Args: 

2126 f: File-like object to wrap 

2127 """ 

2128 self.f = f 

2129 self.sha1 = sha1(b"") 

2130 

2131 def read(self, size: int = -1) -> bytes: 

2132 """Read bytes and update SHA1. 

2133 

2134 Args: 

2135 size: Number of bytes to read, -1 for all 

2136 

2137 Returns: 

2138 Bytes read from file 

2139 """ 

2140 data = self.f.read(size) 

2141 self.sha1.update(data) 

2142 return data 

2143 

2144 def check_sha(self, allow_empty: bool = False) -> None: 

2145 """Check if the SHA1 matches the expected value. 

2146 

2147 Args: 

2148 allow_empty: Allow empty SHA1 hash 

2149 

2150 Raises: 

2151 ChecksumMismatch: If SHA1 doesn't match 

2152 """ 

2153 stored = self.f.read(20) 

2154 # If git option index.skipHash is set the index will be empty 

2155 if stored != self.sha1.digest() and ( 

2156 not allow_empty 

2157 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000" 

2158 ): 

2159 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored)) 

2160 

2161 def close(self) -> None: 

2162 """Close the underlying file.""" 

2163 return self.f.close() 

2164 

2165 def tell(self) -> int: 

2166 """Return current file position.""" 

2167 return self.f.tell() 

2168 

2169 # BinaryIO abstract methods 

2170 def readable(self) -> bool: 

2171 """Check if file is readable.""" 

2172 return True 

2173 

2174 def writable(self) -> bool: 

2175 """Check if file is writable.""" 

2176 return False 

2177 

2178 def seekable(self) -> bool: 

2179 """Check if file is seekable.""" 

2180 return getattr(self.f, "seekable", lambda: False)() 

2181 

2182 def seek(self, offset: int, whence: int = 0) -> int: 

2183 """Seek to position in file. 

2184 

2185 Args: 

2186 offset: Position offset 

2187 whence: Reference point (0=start, 1=current, 2=end) 

2188 

2189 Returns: 

2190 New file position 

2191 """ 

2192 return self.f.seek(offset, whence) 

2193 

2194 def flush(self) -> None: 

2195 """Flush the file buffer.""" 

2196 if hasattr(self.f, "flush"): 

2197 self.f.flush() 

2198 

2199 def readline(self, size: int = -1) -> bytes: 

2200 """Read a line from the file. 

2201 

2202 Args: 

2203 size: Maximum bytes to read 

2204 

2205 Returns: 

2206 Line read from file 

2207 """ 

2208 return self.f.readline(size) 

2209 

2210 def readlines(self, hint: int = -1) -> list[bytes]: 

2211 """Read all lines from the file. 

2212 

2213 Args: 

2214 hint: Approximate number of bytes to read 

2215 

2216 Returns: 

2217 List of lines 

2218 """ 

2219 return self.f.readlines(hint) 

2220 

2221 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2222 """Write multiple lines to the file (not supported).""" 

2223 raise UnsupportedOperation("writelines") 

2224 

2225 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2226 """Write data to the file (not supported).""" 

2227 raise UnsupportedOperation("write") 

2228 

2229 def __enter__(self) -> "SHA1Reader": 

2230 """Enter context manager.""" 

2231 return self 

2232 

2233 def __exit__( 

2234 self, 

2235 type: type | None, 

2236 value: BaseException | None, 

2237 traceback: TracebackType | None, 

2238 ) -> None: 

2239 """Exit context manager and close file.""" 

2240 self.close() 

2241 

2242 def __iter__(self) -> "SHA1Reader": 

2243 """Return iterator for reading file lines.""" 

2244 return self 

2245 

2246 def __next__(self) -> bytes: 

2247 """Get next line from file. 

2248 

2249 Returns: 

2250 Next line 

2251 

2252 Raises: 

2253 StopIteration: When no more lines 

2254 """ 

2255 line = self.readline() 

2256 if not line: 

2257 raise StopIteration 

2258 return line 

2259 

2260 def fileno(self) -> int: 

2261 """Return file descriptor number.""" 

2262 return self.f.fileno() 

2263 

2264 def isatty(self) -> bool: 

2265 """Check if file is a terminal.""" 

2266 return getattr(self.f, "isatty", lambda: False)() 

2267 

2268 def truncate(self, size: int | None = None) -> int: 

2269 """Not supported for read-only file. 

2270 

2271 Raises: 

2272 UnsupportedOperation: Always raised 

2273 """ 

2274 raise UnsupportedOperation("truncate") 

2275 

2276 

2277class SHA1Writer(BinaryIO): 

2278 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2279 

2280 def __init__(self, f: BinaryIO | IO[bytes]) -> None: 

2281 """Initialize SHA1Writer. 

2282 

2283 Args: 

2284 f: File-like object to wrap 

2285 """ 

2286 self.f = f 

2287 self.length = 0 

2288 self.sha1 = sha1(b"") 

2289 self.digest: bytes | None = None 

2290 

2291 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2292 """Write data and update SHA1. 

2293 

2294 Args: 

2295 data: Data to write 

2296 

2297 Returns: 

2298 Number of bytes written 

2299 """ 

2300 self.sha1.update(data) 

2301 written = self.f.write(data) 

2302 self.length += written 

2303 return written 

2304 

2305 def write_sha(self) -> bytes: 

2306 """Write the SHA1 digest to the file. 

2307 

2308 Returns: 

2309 The SHA1 digest bytes 

2310 """ 

2311 sha = self.sha1.digest() 

2312 assert len(sha) == 20 

2313 self.f.write(sha) 

2314 self.length += len(sha) 

2315 return sha 

2316 

2317 def close(self) -> None: 

2318 """Close the pack file and finalize the SHA.""" 

2319 self.digest = self.write_sha() 

2320 self.f.close() 

2321 

2322 def offset(self) -> int: 

2323 """Get the total number of bytes written. 

2324 

2325 Returns: 

2326 Total bytes written 

2327 """ 

2328 return self.length 

2329 

2330 def tell(self) -> int: 

2331 """Return current file position.""" 

2332 return self.f.tell() 

2333 

2334 # BinaryIO abstract methods 

2335 def readable(self) -> bool: 

2336 """Check if file is readable.""" 

2337 return False 

2338 

2339 def writable(self) -> bool: 

2340 """Check if file is writable.""" 

2341 return True 

2342 

2343 def seekable(self) -> bool: 

2344 """Check if file is seekable.""" 

2345 return getattr(self.f, "seekable", lambda: False)() 

2346 

2347 def seek(self, offset: int, whence: int = 0) -> int: 

2348 """Seek to position in file. 

2349 

2350 Args: 

2351 offset: Position offset 

2352 whence: Reference point (0=start, 1=current, 2=end) 

2353 

2354 Returns: 

2355 New file position 

2356 """ 

2357 return self.f.seek(offset, whence) 

2358 

2359 def flush(self) -> None: 

2360 """Flush the file buffer.""" 

2361 if hasattr(self.f, "flush"): 

2362 self.f.flush() 

2363 

2364 def readline(self, size: int = -1) -> bytes: 

2365 """Not supported for write-only file. 

2366 

2367 Raises: 

2368 UnsupportedOperation: Always raised 

2369 """ 

2370 raise UnsupportedOperation("readline") 

2371 

2372 def readlines(self, hint: int = -1) -> list[bytes]: 

2373 """Not supported for write-only file. 

2374 

2375 Raises: 

2376 UnsupportedOperation: Always raised 

2377 """ 

2378 raise UnsupportedOperation("readlines") 

2379 

2380 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2381 """Write multiple lines to the file. 

2382 

2383 Args: 

2384 lines: Iterable of lines to write 

2385 """ 

2386 for line in lines: 

2387 self.write(line) 

2388 

2389 def read(self, size: int = -1) -> bytes: 

2390 """Not supported for write-only file. 

2391 

2392 Raises: 

2393 UnsupportedOperation: Always raised 

2394 """ 

2395 raise UnsupportedOperation("read") 

2396 

2397 def __enter__(self) -> "SHA1Writer": 

2398 """Enter context manager.""" 

2399 return self 

2400 

2401 def __exit__( 

2402 self, 

2403 type: type | None, 

2404 value: BaseException | None, 

2405 traceback: TracebackType | None, 

2406 ) -> None: 

2407 """Exit context manager and close file.""" 

2408 self.close() 

2409 

2410 def __iter__(self) -> "SHA1Writer": 

2411 """Return iterator.""" 

2412 return self 

2413 

2414 def __next__(self) -> bytes: 

2415 """Not supported for write-only file. 

2416 

2417 Raises: 

2418 UnsupportedOperation: Always raised 

2419 """ 

2420 raise UnsupportedOperation("__next__") 

2421 

2422 def fileno(self) -> int: 

2423 """Return file descriptor number.""" 

2424 return self.f.fileno() 

2425 

2426 def isatty(self) -> bool: 

2427 """Check if file is a terminal.""" 

2428 return getattr(self.f, "isatty", lambda: False)() 

2429 

2430 def truncate(self, size: int | None = None) -> int: 

2431 """Not supported for write-only file. 

2432 

2433 Raises: 

2434 UnsupportedOperation: Always raised 

2435 """ 

2436 raise UnsupportedOperation("truncate") 

2437 

2438 

2439def pack_object_header( 

2440 type_num: int, delta_base: bytes | int | None, size: int 

2441) -> bytearray: 

2442 """Create a pack object header for the given object info. 

2443 

2444 Args: 

2445 type_num: Numeric type of the object. 

2446 delta_base: Delta base offset or ref, or None for whole objects. 

2447 size: Uncompressed object size. 

2448 Returns: A header for a packed object. 

2449 """ 

2450 header = [] 

2451 c = (type_num << 4) | (size & 15) 

2452 size >>= 4 

2453 while size: 

2454 header.append(c | 0x80) 

2455 c = size & 0x7F 

2456 size >>= 7 

2457 header.append(c) 

2458 if type_num == OFS_DELTA: 

2459 assert isinstance(delta_base, int) 

2460 ret = [delta_base & 0x7F] 

2461 delta_base >>= 7 

2462 while delta_base: 

2463 delta_base -= 1 

2464 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2465 delta_base >>= 7 

2466 header.extend(ret) 

2467 elif type_num == REF_DELTA: 

2468 assert isinstance(delta_base, bytes) 

2469 assert len(delta_base) == 20 

2470 header += delta_base 

2471 return bytearray(header) 

2472 

2473 

2474def pack_object_chunks( 

2475 type: int, 

2476 object: list[bytes] | tuple[bytes | int, list[bytes]], 

2477 compression_level: int = -1, 

2478) -> Iterator[bytes]: 

2479 """Generate chunks for a pack object. 

2480 

2481 Args: 

2482 type: Numeric type of the object 

2483 object: Object to write 

2484 compression_level: the zlib compression level 

2485 Returns: Chunks 

2486 """ 

2487 if type in DELTA_TYPES: 

2488 if isinstance(object, tuple): 

2489 delta_base, object = object 

2490 else: 

2491 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2492 else: 

2493 delta_base = None 

2494 

2495 # Convert object to list of bytes chunks 

2496 if isinstance(object, bytes): 

2497 chunks = [object] 

2498 elif isinstance(object, list): 

2499 chunks = object 

2500 elif isinstance(object, ShaFile): 

2501 chunks = object.as_raw_chunks() 

2502 else: 

2503 # Shouldn't reach here with proper typing 

2504 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

2505 

2506 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks)))) 

2507 compressor = zlib.compressobj(level=compression_level) 

2508 for data in chunks: 

2509 yield compressor.compress(data) 

2510 yield compressor.flush() 

2511 

2512 

2513def write_pack_object( 

2514 write: Callable[[bytes], int], 

2515 type: int, 

2516 object: list[bytes] | tuple[bytes | int, list[bytes]], 

2517 sha: "HashObject | None" = None, 

2518 compression_level: int = -1, 

2519) -> int: 

2520 """Write pack object to a file. 

2521 

2522 Args: 

2523 write: Write function to use 

2524 type: Numeric type of the object 

2525 object: Object to write 

2526 sha: Optional SHA-1 hasher to update 

2527 compression_level: the zlib compression level 

2528 Returns: CRC32 checksum of the written object 

2529 """ 

2530 crc32 = 0 

2531 for chunk in pack_object_chunks(type, object, compression_level=compression_level): 

2532 write(chunk) 

2533 if sha is not None: 

2534 sha.update(chunk) 

2535 crc32 = binascii.crc32(chunk, crc32) 

2536 return crc32 & 0xFFFFFFFF 

2537 

2538 

2539def write_pack( 

2540 filename: str, 

2541 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

2542 *, 

2543 deltify: bool | None = None, 

2544 delta_window_size: int | None = None, 

2545 compression_level: int = -1, 

2546) -> tuple[bytes, bytes]: 

2547 """Write a new pack data file. 

2548 

2549 Args: 

2550 filename: Path to the new pack file (without .pack extension) 

2551 objects: Objects to write to the pack 

2552 delta_window_size: Delta window size 

2553 deltify: Whether to deltify pack objects 

2554 compression_level: the zlib compression level 

2555 Returns: Tuple with checksum of pack file and index file 

2556 """ 

2557 with GitFile(filename + ".pack", "wb") as f: 

2558 entries, data_sum = write_pack_objects( 

2559 f, 

2560 objects, 

2561 delta_window_size=delta_window_size, 

2562 deltify=deltify, 

2563 compression_level=compression_level, 

2564 ) 

2565 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

2566 with GitFile(filename + ".idx", "wb") as f: 

2567 idx_sha = write_pack_index(f, entries_list, data_sum) 

2568 return data_sum, idx_sha 

2569 

2570 

2571def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

2572 """Yield chunks for a pack header.""" 

2573 yield b"PACK" # Pack header 

2574 yield struct.pack(b">L", 2) # Pack version 

2575 yield struct.pack(b">L", num_objects) # Number of objects in pack 

2576 

2577 

2578def write_pack_header( 

2579 write: Callable[[bytes], int] | IO[bytes], num_objects: int 

2580) -> None: 

2581 """Write a pack header for the given number of objects.""" 

2582 write_fn: Callable[[bytes], int] 

2583 if hasattr(write, "write"): 

2584 write_fn = write.write 

2585 warnings.warn( 

2586 "write_pack_header() now takes a write rather than file argument", 

2587 DeprecationWarning, 

2588 stacklevel=2, 

2589 ) 

2590 else: 

2591 write_fn = write 

2592 for chunk in pack_header_chunks(num_objects): 

2593 write_fn(chunk) 

2594 

2595 

2596def find_reusable_deltas( 

2597 container: PackedObjectContainer, 

2598 object_ids: Set[bytes], 

2599 *, 

2600 other_haves: Set[bytes] | None = None, 

2601 progress: Callable[..., None] | None = None, 

2602) -> Iterator[UnpackedObject]: 

2603 """Find deltas in a pack that can be reused. 

2604 

2605 Args: 

2606 container: Pack container to search for deltas 

2607 object_ids: Set of object IDs to find deltas for 

2608 other_haves: Set of other object IDs we have 

2609 progress: Optional progress reporting callback 

2610 

2611 Returns: 

2612 Iterator of UnpackedObject entries that can be reused 

2613 """ 

2614 if other_haves is None: 

2615 other_haves = set() 

2616 reused = 0 

2617 for i, unpacked in enumerate( 

2618 container.iter_unpacked_subset( 

2619 object_ids, allow_missing=True, convert_ofs_delta=True 

2620 ) 

2621 ): 

2622 if progress is not None and i % 1000 == 0: 

2623 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

2624 if unpacked.pack_type_num == REF_DELTA: 

2625 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

2626 if hexsha in object_ids or hexsha in other_haves: 

2627 yield unpacked 

2628 reused += 1 

2629 if progress is not None: 

2630 progress((f"found {reused} deltas to reuse\n").encode()) 

2631 

2632 

2633def deltify_pack_objects( 

2634 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, bytes | None]], 

2635 *, 

2636 window_size: int | None = None, 

2637 progress: Callable[..., None] | None = None, 

2638) -> Iterator[UnpackedObject]: 

2639 """Generate deltas for pack objects. 

2640 

2641 Args: 

2642 objects: An iterable of (object, path) tuples to deltify. 

2643 window_size: Window size; None for default 

2644 progress: Optional progress reporting callback 

2645 Returns: Iterator over type_num, object id, delta_base, content 

2646 delta_base is None for full text entries 

2647 """ 

2648 

2649 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, bytes | None]]]: 

2650 for e in objects: 

2651 if isinstance(e, ShaFile): 

2652 yield (e, (e.type_num, None)) 

2653 else: 

2654 yield (e[0], (e[0].type_num, e[1])) 

2655 

2656 sorted_objs = sort_objects_for_delta(objects_with_hints()) 

2657 yield from deltas_from_sorted_objects( 

2658 sorted_objs, 

2659 window_size=window_size, 

2660 progress=progress, 

2661 ) 

2662 

2663 

2664def sort_objects_for_delta( 

2665 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, PackHint | None]], 

2666) -> Iterator[tuple[ShaFile, bytes | None]]: 

2667 """Sort objects for optimal delta compression. 

2668 

2669 Args: 

2670 objects: Iterator of objects or (object, hint) tuples 

2671 

2672 Returns: 

2673 Iterator of sorted (ShaFile, path) tuples 

2674 """ 

2675 magic = [] 

2676 for entry in objects: 

2677 if isinstance(entry, tuple): 

2678 obj, hint = entry 

2679 if hint is None: 

2680 type_num = None 

2681 path = None 

2682 else: 

2683 (type_num, path) = hint 

2684 else: 

2685 obj = entry 

2686 type_num = None 

2687 path = None 

2688 magic.append((type_num, path, -obj.raw_length(), obj)) 

2689 # Build a list of objects ordered by the magic Linus heuristic 

2690 # This helps us find good objects to diff against us 

2691 magic.sort() 

2692 return ((x[3], x[1]) for x in magic) 

2693 

2694 

2695def deltas_from_sorted_objects( 

2696 objects: Iterator[tuple[ShaFile, bytes | None]], 

2697 window_size: int | None = None, 

2698 progress: Callable[..., None] | None = None, 

2699) -> Iterator[UnpackedObject]: 

2700 """Create deltas from sorted objects. 

2701 

2702 Args: 

2703 objects: Iterator of sorted objects to deltify 

2704 window_size: Delta window size; None for default 

2705 progress: Optional progress reporting callback 

2706 

2707 Returns: 

2708 Iterator of UnpackedObject entries 

2709 """ 

2710 # TODO(jelmer): Use threads 

2711 if window_size is None: 

2712 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

2713 

2714 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

2715 for i, (o, path) in enumerate(objects): 

2716 if progress is not None and i % 1000 == 0: 

2717 progress((f"generating deltas: {i}\r").encode()) 

2718 raw = o.as_raw_chunks() 

2719 winner = raw 

2720 winner_len = sum(map(len, winner)) 

2721 winner_base = None 

2722 for base_id, base_type_num, base in possible_bases: 

2723 if base_type_num != o.type_num: 

2724 continue 

2725 delta_len = 0 

2726 delta = [] 

2727 for chunk in create_delta(b"".join(base), b"".join(raw)): 

2728 delta_len += len(chunk) 

2729 if delta_len >= winner_len: 

2730 break 

2731 delta.append(chunk) 

2732 else: 

2733 winner_base = base_id 

2734 winner = delta 

2735 winner_len = sum(map(len, winner)) 

2736 yield UnpackedObject( 

2737 o.type_num, 

2738 sha=o.sha().digest(), 

2739 delta_base=winner_base, 

2740 decomp_len=winner_len, 

2741 decomp_chunks=winner, 

2742 ) 

2743 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

2744 while len(possible_bases) > window_size: 

2745 possible_bases.pop() 

2746 

2747 

2748def pack_objects_to_data( 

2749 objects: Sequence[ShaFile] 

2750 | Sequence[tuple[ShaFile, bytes | None]] 

2751 | Sequence[tuple[ShaFile, PackHint | None]], 

2752 *, 

2753 deltify: bool | None = None, 

2754 delta_window_size: int | None = None, 

2755 ofs_delta: bool = True, 

2756 progress: Callable[..., None] | None = None, 

2757) -> tuple[int, Iterator[UnpackedObject]]: 

2758 """Create pack data from objects. 

2759 

2760 Args: 

2761 objects: Pack objects 

2762 deltify: Whether to deltify pack objects 

2763 delta_window_size: Delta window size 

2764 ofs_delta: Whether to use offset deltas 

2765 progress: Optional progress reporting callback 

2766 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2767 """ 

2768 count = len(objects) 

2769 if deltify is None: 

2770 # PERFORMANCE/TODO(jelmer): This should be enabled but the python 

2771 # implementation is *much* too slow at the moment. 

2772 # Maybe consider enabling it just if the rust extension is available? 

2773 deltify = False 

2774 if deltify: 

2775 return ( 

2776 count, 

2777 deltify_pack_objects( 

2778 iter(objects), # type: ignore 

2779 window_size=delta_window_size, 

2780 progress=progress, 

2781 ), 

2782 ) 

2783 else: 

2784 

2785 def iter_without_path() -> Iterator[UnpackedObject]: 

2786 for o in objects: 

2787 if isinstance(o, tuple): 

2788 yield full_unpacked_object(o[0]) 

2789 else: 

2790 yield full_unpacked_object(o) 

2791 

2792 return (count, iter_without_path()) 

2793 

2794 

2795def generate_unpacked_objects( 

2796 container: PackedObjectContainer, 

2797 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

2798 delta_window_size: int | None = None, 

2799 deltify: bool | None = None, 

2800 reuse_deltas: bool = True, 

2801 ofs_delta: bool = True, 

2802 other_haves: set[bytes] | None = None, 

2803 progress: Callable[..., None] | None = None, 

2804) -> Iterator[UnpackedObject]: 

2805 """Create pack data from objects. 

2806 

2807 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2808 """ 

2809 todo = dict(object_ids) 

2810 if reuse_deltas: 

2811 for unpack in find_reusable_deltas( 

2812 container, set(todo), other_haves=other_haves, progress=progress 

2813 ): 

2814 del todo[sha_to_hex(unpack.sha())] 

2815 yield unpack 

2816 if deltify is None: 

2817 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2818 # slow at the moment. 

2819 deltify = False 

2820 if deltify: 

2821 objects_to_delta = container.iterobjects_subset( 

2822 todo.keys(), allow_missing=False 

2823 ) 

2824 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta) 

2825 yield from deltas_from_sorted_objects( 

2826 sorted_objs, 

2827 window_size=delta_window_size, 

2828 progress=progress, 

2829 ) 

2830 else: 

2831 for oid in todo: 

2832 yield full_unpacked_object(container[oid]) 

2833 

2834 

2835def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

2836 """Create an UnpackedObject from a ShaFile. 

2837 

2838 Args: 

2839 o: ShaFile object to convert 

2840 

2841 Returns: 

2842 UnpackedObject with full object data 

2843 """ 

2844 return UnpackedObject( 

2845 o.type_num, 

2846 delta_base=None, 

2847 crc32=None, 

2848 decomp_chunks=o.as_raw_chunks(), 

2849 sha=o.sha().digest(), 

2850 ) 

2851 

2852 

2853def write_pack_from_container( 

2854 write: Callable[[bytes], None] 

2855 | Callable[[bytes | bytearray | memoryview], int] 

2856 | IO[bytes], 

2857 container: PackedObjectContainer, 

2858 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

2859 delta_window_size: int | None = None, 

2860 deltify: bool | None = None, 

2861 reuse_deltas: bool = True, 

2862 compression_level: int = -1, 

2863 other_haves: set[bytes] | None = None, 

2864) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

2865 """Write a new pack data file. 

2866 

2867 Args: 

2868 write: write function to use 

2869 container: PackedObjectContainer 

2870 object_ids: Sequence of (object_id, hint) tuples to write 

2871 delta_window_size: Sliding window size for searching for deltas; 

2872 Set to None for default window size. 

2873 deltify: Whether to deltify objects 

2874 reuse_deltas: Whether to reuse existing deltas 

2875 compression_level: the zlib compression level to use 

2876 other_haves: Set of additional object IDs the receiver has 

2877 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2878 """ 

2879 pack_contents_count = len(object_ids) 

2880 pack_contents = generate_unpacked_objects( 

2881 container, 

2882 object_ids, 

2883 delta_window_size=delta_window_size, 

2884 deltify=deltify, 

2885 reuse_deltas=reuse_deltas, 

2886 other_haves=other_haves, 

2887 ) 

2888 

2889 return write_pack_data( 

2890 write, 

2891 pack_contents, 

2892 num_records=pack_contents_count, 

2893 compression_level=compression_level, 

2894 ) 

2895 

2896 

2897def write_pack_objects( 

2898 write: Callable[[bytes], None] | IO[bytes], 

2899 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

2900 *, 

2901 delta_window_size: int | None = None, 

2902 deltify: bool | None = None, 

2903 compression_level: int = -1, 

2904) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

2905 """Write a new pack data file. 

2906 

2907 Args: 

2908 write: write function to use 

2909 objects: Sequence of (object, path) tuples to write 

2910 delta_window_size: Sliding window size for searching for deltas; 

2911 Set to None for default window size. 

2912 deltify: Whether to deltify objects 

2913 compression_level: the zlib compression level to use 

2914 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2915 """ 

2916 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

2917 

2918 return write_pack_data( 

2919 write, 

2920 pack_contents, 

2921 num_records=pack_contents_count, 

2922 compression_level=compression_level, 

2923 ) 

2924 

2925 

2926class PackChunkGenerator: 

2927 """Generator for pack data chunks.""" 

2928 

2929 def __init__( 

2930 self, 

2931 num_records: int | None = None, 

2932 records: Iterator[UnpackedObject] | None = None, 

2933 progress: Callable[..., None] | None = None, 

2934 compression_level: int = -1, 

2935 reuse_compressed: bool = True, 

2936 ) -> None: 

2937 """Initialize PackChunkGenerator. 

2938 

2939 Args: 

2940 num_records: Expected number of records 

2941 records: Iterator of pack records 

2942 progress: Optional progress callback 

2943 compression_level: Compression level (-1 for default) 

2944 reuse_compressed: Whether to reuse compressed chunks 

2945 """ 

2946 self.cs = sha1(b"") 

2947 self.entries: dict[bytes, tuple[int, int]] = {} 

2948 if records is None: 

2949 records = iter([]) # Empty iterator if None 

2950 self._it = self._pack_data_chunks( 

2951 records=records, 

2952 num_records=num_records, 

2953 progress=progress, 

2954 compression_level=compression_level, 

2955 reuse_compressed=reuse_compressed, 

2956 ) 

2957 

2958 def sha1digest(self) -> bytes: 

2959 """Return the SHA1 digest of the pack data.""" 

2960 return self.cs.digest() 

2961 

2962 def __iter__(self) -> Iterator[bytes]: 

2963 """Iterate over pack data chunks.""" 

2964 return self._it 

2965 

2966 def _pack_data_chunks( 

2967 self, 

2968 records: Iterator[UnpackedObject], 

2969 *, 

2970 num_records: int | None = None, 

2971 progress: Callable[..., None] | None = None, 

2972 compression_level: int = -1, 

2973 reuse_compressed: bool = True, 

2974 ) -> Iterator[bytes]: 

2975 """Iterate pack data file chunks. 

2976 

2977 Args: 

2978 records: Iterator over UnpackedObject 

2979 num_records: Number of records (defaults to len(records) if not specified) 

2980 progress: Function to report progress to 

2981 compression_level: the zlib compression level 

2982 reuse_compressed: Whether to reuse compressed chunks 

2983 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2984 """ 

2985 # Write the pack 

2986 if num_records is None: 

2987 num_records = len(records) # type: ignore 

2988 offset = 0 

2989 for chunk in pack_header_chunks(num_records): 

2990 yield chunk 

2991 self.cs.update(chunk) 

2992 offset += len(chunk) 

2993 actual_num_records = 0 

2994 for i, unpacked in enumerate(records): 

2995 type_num = unpacked.pack_type_num 

2996 if progress is not None and i % 1000 == 0: 

2997 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

2998 raw: list[bytes] | tuple[int, list[bytes]] | tuple[bytes, list[bytes]] 

2999 if unpacked.delta_base is not None: 

3000 assert isinstance(unpacked.delta_base, bytes), ( 

3001 f"Expected bytes, got {type(unpacked.delta_base)}" 

3002 ) 

3003 try: 

3004 base_offset, _base_crc32 = self.entries[unpacked.delta_base] 

3005 except KeyError: 

3006 type_num = REF_DELTA 

3007 assert isinstance(unpacked.delta_base, bytes) 

3008 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

3009 else: 

3010 type_num = OFS_DELTA 

3011 raw = (offset - base_offset, unpacked.decomp_chunks) 

3012 else: 

3013 raw = unpacked.decomp_chunks 

3014 chunks: list[bytes] | Iterator[bytes] 

3015 if unpacked.comp_chunks is not None and reuse_compressed: 

3016 chunks = unpacked.comp_chunks 

3017 else: 

3018 chunks = pack_object_chunks( 

3019 type_num, raw, compression_level=compression_level 

3020 ) 

3021 crc32 = 0 

3022 object_size = 0 

3023 for chunk in chunks: 

3024 yield chunk 

3025 crc32 = binascii.crc32(chunk, crc32) 

3026 self.cs.update(chunk) 

3027 object_size += len(chunk) 

3028 actual_num_records += 1 

3029 self.entries[unpacked.sha()] = (offset, crc32) 

3030 offset += object_size 

3031 if actual_num_records != num_records: 

3032 raise AssertionError( 

3033 f"actual records written differs: {actual_num_records} != {num_records}" 

3034 ) 

3035 

3036 yield self.cs.digest() 

3037 

3038 

3039def write_pack_data( 

3040 write: Callable[[bytes], None] 

3041 | Callable[[bytes | bytearray | memoryview], int] 

3042 | IO[bytes], 

3043 records: Iterator[UnpackedObject], 

3044 *, 

3045 num_records: int | None = None, 

3046 progress: Callable[..., None] | None = None, 

3047 compression_level: int = -1, 

3048) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3049 """Write a new pack data file. 

3050 

3051 Args: 

3052 write: Write function to use 

3053 num_records: Number of records (defaults to len(records) if None) 

3054 records: Iterator over type_num, object_id, delta_base, raw 

3055 progress: Function to report progress to 

3056 compression_level: the zlib compression level 

3057 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3058 """ 

3059 chunk_generator = PackChunkGenerator( 

3060 num_records=num_records, 

3061 records=records, 

3062 progress=progress, 

3063 compression_level=compression_level, 

3064 ) 

3065 for chunk in chunk_generator: 

3066 if callable(write): 

3067 write(chunk) 

3068 else: 

3069 write.write(chunk) 

3070 return chunk_generator.entries, chunk_generator.sha1digest() 

3071 

3072 

3073def write_pack_index_v1( 

3074 f: IO[bytes], 

3075 entries: Iterable[tuple[bytes, int, int | None]], 

3076 pack_checksum: bytes, 

3077) -> bytes: 

3078 """Write a new pack index file. 

3079 

3080 Args: 

3081 f: A file-like object to write to 

3082 entries: List of tuples with object name (sha), offset_in_pack, 

3083 and crc32_checksum. 

3084 pack_checksum: Checksum of the pack file. 

3085 Returns: The SHA of the written index file 

3086 """ 

3087 f = SHA1Writer(f) 

3088 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3089 for name, _offset, _entry_checksum in entries: 

3090 fan_out_table[ord(name[:1])] += 1 

3091 # Fan-out table 

3092 for i in range(0x100): 

3093 f.write(struct.pack(">L", fan_out_table[i])) 

3094 fan_out_table[i + 1] += fan_out_table[i] 

3095 for name, offset, _entry_checksum in entries: 

3096 if not (offset <= 0xFFFFFFFF): 

3097 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3098 f.write(struct.pack(">L20s", offset, name)) 

3099 assert len(pack_checksum) == 20 

3100 f.write(pack_checksum) 

3101 return f.write_sha() 

3102 

3103 

3104def _delta_encode_size(size: int) -> bytes: 

3105 ret = bytearray() 

3106 c = size & 0x7F 

3107 size >>= 7 

3108 while size: 

3109 ret.append(c | 0x80) 

3110 c = size & 0x7F 

3111 size >>= 7 

3112 ret.append(c) 

3113 return bytes(ret) 

3114 

3115 

3116# The length of delta compression copy operations in version 2 packs is limited 

3117# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3118# 24-bit lengths in copy operations, but we always make version 2 packs. 

3119_MAX_COPY_LEN = 0xFFFF 

3120 

3121 

3122def _encode_copy_operation(start: int, length: int) -> bytes: 

3123 scratch = bytearray([0x80]) 

3124 for i in range(4): 

3125 if start & 0xFF << i * 8: 

3126 scratch.append((start >> i * 8) & 0xFF) 

3127 scratch[0] |= 1 << i 

3128 for i in range(2): 

3129 if length & 0xFF << i * 8: 

3130 scratch.append((length >> i * 8) & 0xFF) 

3131 scratch[0] |= 1 << (4 + i) 

3132 return bytes(scratch) 

3133 

3134 

3135def _create_delta_py(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3136 """Use python difflib to work out how to transform base_buf to target_buf. 

3137 

3138 Args: 

3139 base_buf: Base buffer 

3140 target_buf: Target buffer 

3141 """ 

3142 if isinstance(base_buf, list): 

3143 base_buf = b"".join(base_buf) 

3144 if isinstance(target_buf, list): 

3145 target_buf = b"".join(target_buf) 

3146 assert isinstance(base_buf, bytes) 

3147 assert isinstance(target_buf, bytes) 

3148 # write delta header 

3149 yield _delta_encode_size(len(base_buf)) 

3150 yield _delta_encode_size(len(target_buf)) 

3151 # write out delta opcodes 

3152 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3153 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3154 # Git patch opcodes don't care about deletes! 

3155 # if opcode == 'replace' or opcode == 'delete': 

3156 # pass 

3157 if opcode == "equal": 

3158 # If they are equal, unpacker will use data from base_buf 

3159 # Write out an opcode that says what range to use 

3160 copy_start = i1 

3161 copy_len = i2 - i1 

3162 while copy_len > 0: 

3163 to_copy = min(copy_len, _MAX_COPY_LEN) 

3164 yield _encode_copy_operation(copy_start, to_copy) 

3165 copy_start += to_copy 

3166 copy_len -= to_copy 

3167 if opcode == "replace" or opcode == "insert": 

3168 # If we are replacing a range or adding one, then we just 

3169 # output it to the stream (prefixed by its size) 

3170 s = j2 - j1 

3171 o = j1 

3172 while s > 127: 

3173 yield bytes([127]) 

3174 yield bytes(memoryview(target_buf)[o : o + 127]) 

3175 s -= 127 

3176 o += 127 

3177 yield bytes([s]) 

3178 yield bytes(memoryview(target_buf)[o : o + s]) 

3179 

3180 

3181# Default to pure Python implementation 

3182create_delta = _create_delta_py 

3183 

3184 

3185def apply_delta( 

3186 src_buf: bytes | list[bytes], delta: bytes | list[bytes] 

3187) -> list[bytes]: 

3188 """Based on the similar function in git's patch-delta.c. 

3189 

3190 Args: 

3191 src_buf: Source buffer 

3192 delta: Delta instructions 

3193 """ 

3194 if not isinstance(src_buf, bytes): 

3195 src_buf = b"".join(src_buf) 

3196 if not isinstance(delta, bytes): 

3197 delta = b"".join(delta) 

3198 out = [] 

3199 index = 0 

3200 delta_length = len(delta) 

3201 

3202 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3203 size = 0 

3204 i = 0 

3205 while delta: 

3206 cmd = ord(delta[index : index + 1]) 

3207 index += 1 

3208 size |= (cmd & ~0x80) << i 

3209 i += 7 

3210 if not cmd & 0x80: 

3211 break 

3212 return size, index 

3213 

3214 src_size, index = get_delta_header_size(delta, index) 

3215 dest_size, index = get_delta_header_size(delta, index) 

3216 if src_size != len(src_buf): 

3217 raise ApplyDeltaError( 

3218 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3219 ) 

3220 while index < delta_length: 

3221 cmd = ord(delta[index : index + 1]) 

3222 index += 1 

3223 if cmd & 0x80: 

3224 cp_off = 0 

3225 for i in range(4): 

3226 if cmd & (1 << i): 

3227 x = ord(delta[index : index + 1]) 

3228 index += 1 

3229 cp_off |= x << (i * 8) 

3230 cp_size = 0 

3231 # Version 3 packs can contain copy sizes larger than 64K. 

3232 for i in range(3): 

3233 if cmd & (1 << (4 + i)): 

3234 x = ord(delta[index : index + 1]) 

3235 index += 1 

3236 cp_size |= x << (i * 8) 

3237 if cp_size == 0: 

3238 cp_size = 0x10000 

3239 if ( 

3240 cp_off + cp_size < cp_size 

3241 or cp_off + cp_size > src_size 

3242 or cp_size > dest_size 

3243 ): 

3244 break 

3245 out.append(src_buf[cp_off : cp_off + cp_size]) 

3246 elif cmd != 0: 

3247 out.append(delta[index : index + cmd]) 

3248 index += cmd 

3249 else: 

3250 raise ApplyDeltaError("Invalid opcode 0") 

3251 

3252 if index != delta_length: 

3253 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3254 

3255 if dest_size != chunks_length(out): 

3256 raise ApplyDeltaError("dest size incorrect") 

3257 

3258 return out 

3259 

3260 

3261def write_pack_index_v2( 

3262 f: IO[bytes], 

3263 entries: Iterable[tuple[bytes, int, int | None]], 

3264 pack_checksum: bytes, 

3265) -> bytes: 

3266 """Write a new pack index file. 

3267 

3268 Args: 

3269 f: File-like object to write to 

3270 entries: List of tuples with object name (sha), offset_in_pack, and 

3271 crc32_checksum. 

3272 pack_checksum: Checksum of the pack file. 

3273 Returns: The SHA of the index file written 

3274 """ 

3275 f = SHA1Writer(f) 

3276 f.write(b"\377tOc") # Magic! 

3277 f.write(struct.pack(">L", 2)) 

3278 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3279 for name, offset, entry_checksum in entries: 

3280 fan_out_table[ord(name[:1])] += 1 

3281 # Fan-out table 

3282 largetable: list[int] = [] 

3283 for i in range(0x100): 

3284 f.write(struct.pack(b">L", fan_out_table[i])) 

3285 fan_out_table[i + 1] += fan_out_table[i] 

3286 for name, offset, entry_checksum in entries: 

3287 f.write(name) 

3288 for name, offset, entry_checksum in entries: 

3289 f.write(struct.pack(b">L", entry_checksum)) 

3290 for name, offset, entry_checksum in entries: 

3291 if offset < 2**31: 

3292 f.write(struct.pack(b">L", offset)) 

3293 else: 

3294 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3295 largetable.append(offset) 

3296 for offset in largetable: 

3297 f.write(struct.pack(b">Q", offset)) 

3298 assert len(pack_checksum) == 20 

3299 f.write(pack_checksum) 

3300 return f.write_sha() 

3301 

3302 

3303def write_pack_index_v3( 

3304 f: IO[bytes], 

3305 entries: Iterable[tuple[bytes, int, int | None]], 

3306 pack_checksum: bytes, 

3307 hash_algorithm: int = 1, 

3308) -> bytes: 

3309 """Write a new pack index file in v3 format. 

3310 

3311 Args: 

3312 f: File-like object to write to 

3313 entries: List of tuples with object name (sha), offset_in_pack, and 

3314 crc32_checksum. 

3315 pack_checksum: Checksum of the pack file. 

3316 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3317 Returns: The SHA of the index file written 

3318 """ 

3319 if hash_algorithm == 1: 

3320 hash_size = 20 # SHA-1 

3321 writer_cls = SHA1Writer 

3322 elif hash_algorithm == 2: 

3323 hash_size = 32 # SHA-256 

3324 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3325 raise NotImplementedError("SHA-256 support not yet implemented") 

3326 else: 

3327 raise ValueError(f"Unknown hash algorithm {hash_algorithm}") 

3328 

3329 # Convert entries to list to allow multiple iterations 

3330 entries_list = list(entries) 

3331 

3332 # Calculate shortest unambiguous prefix length for object names 

3333 # For now, use full hash size (this could be optimized) 

3334 shortened_oid_len = hash_size 

3335 

3336 f = writer_cls(f) 

3337 f.write(b"\377tOc") # Magic! 

3338 f.write(struct.pack(">L", 3)) # Version 3 

3339 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm 

3340 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3341 

3342 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3343 for name, offset, entry_checksum in entries_list: 

3344 if len(name) != hash_size: 

3345 raise ValueError( 

3346 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3347 ) 

3348 fan_out_table[ord(name[:1])] += 1 

3349 

3350 # Fan-out table 

3351 largetable: list[int] = [] 

3352 for i in range(0x100): 

3353 f.write(struct.pack(b">L", fan_out_table[i])) 

3354 fan_out_table[i + 1] += fan_out_table[i] 

3355 

3356 # Object names table 

3357 for name, offset, entry_checksum in entries_list: 

3358 f.write(name) 

3359 

3360 # CRC32 checksums table 

3361 for name, offset, entry_checksum in entries_list: 

3362 f.write(struct.pack(b">L", entry_checksum)) 

3363 

3364 # Offset table 

3365 for name, offset, entry_checksum in entries_list: 

3366 if offset < 2**31: 

3367 f.write(struct.pack(b">L", offset)) 

3368 else: 

3369 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3370 largetable.append(offset) 

3371 

3372 # Large offset table 

3373 for offset in largetable: 

3374 f.write(struct.pack(b">Q", offset)) 

3375 

3376 assert len(pack_checksum) == hash_size, ( 

3377 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3378 ) 

3379 f.write(pack_checksum) 

3380 return f.write_sha() 

3381 

3382 

3383def write_pack_index( 

3384 f: IO[bytes], 

3385 entries: Iterable[tuple[bytes, int, int | None]], 

3386 pack_checksum: bytes, 

3387 progress: Callable[..., None] | None = None, 

3388 version: int | None = None, 

3389) -> bytes: 

3390 """Write a pack index file. 

3391 

3392 Args: 

3393 f: File-like object to write to. 

3394 entries: List of (checksum, offset, crc32) tuples 

3395 pack_checksum: Checksum of the pack file. 

3396 progress: Progress function (not currently used) 

3397 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3398 

3399 Returns: 

3400 SHA of the written index file 

3401 """ 

3402 if version is None: 

3403 version = DEFAULT_PACK_INDEX_VERSION 

3404 

3405 if version == 1: 

3406 return write_pack_index_v1(f, entries, pack_checksum) 

3407 elif version == 2: 

3408 return write_pack_index_v2(f, entries, pack_checksum) 

3409 elif version == 3: 

3410 return write_pack_index_v3(f, entries, pack_checksum) 

3411 else: 

3412 raise ValueError(f"Unsupported pack index version: {version}") 

3413 

3414 

3415class Pack: 

3416 """A Git pack object.""" 

3417 

3418 _data_load: Callable[[], PackData] | None 

3419 _idx_load: Callable[[], PackIndex] | None 

3420 

3421 _data: PackData | None 

3422 _idx: PackIndex | None 

3423 _bitmap: "PackBitmap | None" 

3424 

3425 def __init__( 

3426 self, 

3427 basename: str, 

3428 resolve_ext_ref: ResolveExtRefFn | None = None, 

3429 *, 

3430 delta_window_size: int | None = None, 

3431 window_memory: int | None = None, 

3432 delta_cache_size: int | None = None, 

3433 depth: int | None = None, 

3434 threads: int | None = None, 

3435 big_file_threshold: int | None = None, 

3436 ) -> None: 

3437 """Initialize a Pack object. 

3438 

3439 Args: 

3440 basename: Base path for pack files (without .pack/.idx extension) 

3441 resolve_ext_ref: Optional function to resolve external references 

3442 delta_window_size: Size of the delta compression window 

3443 window_memory: Memory limit for delta compression window 

3444 delta_cache_size: Size of the delta cache 

3445 depth: Maximum depth for delta chains 

3446 threads: Number of threads to use for operations 

3447 big_file_threshold: Size threshold for big file handling 

3448 """ 

3449 self._basename = basename 

3450 self._data = None 

3451 self._idx = None 

3452 self._bitmap = None 

3453 self._idx_path = self._basename + ".idx" 

3454 self._data_path = self._basename + ".pack" 

3455 self._bitmap_path = self._basename + ".bitmap" 

3456 self.delta_window_size = delta_window_size 

3457 self.window_memory = window_memory 

3458 self.delta_cache_size = delta_cache_size 

3459 self.depth = depth 

3460 self.threads = threads 

3461 self.big_file_threshold = big_file_threshold 

3462 self._data_load = lambda: PackData( 

3463 self._data_path, 

3464 delta_window_size=delta_window_size, 

3465 window_memory=window_memory, 

3466 delta_cache_size=delta_cache_size, 

3467 depth=depth, 

3468 threads=threads, 

3469 big_file_threshold=big_file_threshold, 

3470 ) 

3471 self._idx_load = lambda: load_pack_index(self._idx_path) 

3472 self.resolve_ext_ref = resolve_ext_ref 

3473 

3474 @classmethod 

3475 def from_lazy_objects( 

3476 cls, data_fn: Callable[[], PackData], idx_fn: Callable[[], PackIndex] 

3477 ) -> "Pack": 

3478 """Create a new pack object from callables to load pack data and index objects.""" 

3479 ret = cls("") 

3480 ret._data_load = data_fn 

3481 ret._idx_load = idx_fn 

3482 return ret 

3483 

3484 @classmethod 

3485 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

3486 """Create a new pack object from pack data and index objects.""" 

3487 ret = cls("") 

3488 ret._data = data 

3489 ret._data_load = None 

3490 ret._idx = idx 

3491 ret._idx_load = None 

3492 ret.check_length_and_checksum() 

3493 return ret 

3494 

3495 def name(self) -> bytes: 

3496 """The SHA over the SHAs of the objects in this pack.""" 

3497 return self.index.objects_sha1() 

3498 

3499 @property 

3500 def data(self) -> PackData: 

3501 """The pack data object being used.""" 

3502 if self._data is None: 

3503 assert self._data_load 

3504 self._data = self._data_load() 

3505 self.check_length_and_checksum() 

3506 return self._data 

3507 

3508 @property 

3509 def index(self) -> PackIndex: 

3510 """The index being used. 

3511 

3512 Note: This may be an in-memory index 

3513 """ 

3514 if self._idx is None: 

3515 assert self._idx_load 

3516 self._idx = self._idx_load() 

3517 return self._idx 

3518 

3519 @property 

3520 def bitmap(self) -> "PackBitmap | None": 

3521 """The bitmap being used, if available. 

3522 

3523 Returns: 

3524 PackBitmap instance or None if no bitmap exists 

3525 

3526 Raises: 

3527 ValueError: If bitmap file is invalid or corrupt 

3528 """ 

3529 if self._bitmap is None: 

3530 from .bitmap import read_bitmap 

3531 

3532 self._bitmap = read_bitmap(self._bitmap_path, pack_index=self.index) 

3533 return self._bitmap 

3534 

3535 def ensure_bitmap( 

3536 self, 

3537 object_store: "BaseObjectStore", 

3538 refs: dict[bytes, bytes], 

3539 commit_interval: int | None = None, 

3540 progress: Callable[[str], None] | None = None, 

3541 ) -> "PackBitmap": 

3542 """Ensure a bitmap exists for this pack, generating one if needed. 

3543 

3544 Args: 

3545 object_store: Object store to read objects from 

3546 refs: Dictionary of ref names to commit SHAs 

3547 commit_interval: Include every Nth commit in bitmap index 

3548 progress: Optional progress reporting callback 

3549 

3550 Returns: 

3551 PackBitmap instance (either existing or newly generated) 

3552 """ 

3553 from .bitmap import generate_bitmap, write_bitmap 

3554 

3555 # Check if bitmap already exists 

3556 try: 

3557 existing = self.bitmap 

3558 if existing is not None: 

3559 return existing 

3560 except FileNotFoundError: 

3561 pass # No bitmap, we'll generate one 

3562 

3563 # Generate new bitmap 

3564 if progress: 

3565 progress(f"Generating bitmap for {self.name().decode('utf-8')}...\n") 

3566 

3567 pack_bitmap = generate_bitmap( 

3568 self.index, 

3569 object_store, 

3570 refs, 

3571 self.get_stored_checksum(), 

3572 commit_interval=commit_interval, 

3573 progress=progress, 

3574 ) 

3575 

3576 # Write bitmap file 

3577 write_bitmap(self._bitmap_path, pack_bitmap) 

3578 

3579 if progress: 

3580 progress(f"Wrote {self._bitmap_path}\n") 

3581 

3582 # Update cached bitmap 

3583 self._bitmap = pack_bitmap 

3584 

3585 return pack_bitmap 

3586 

3587 def close(self) -> None: 

3588 """Close the pack file and index.""" 

3589 if self._data is not None: 

3590 self._data.close() 

3591 if self._idx is not None: 

3592 self._idx.close() 

3593 

3594 def __enter__(self) -> "Pack": 

3595 """Enter context manager.""" 

3596 return self 

3597 

3598 def __exit__( 

3599 self, 

3600 exc_type: type | None, 

3601 exc_val: BaseException | None, 

3602 exc_tb: TracebackType | None, 

3603 ) -> None: 

3604 """Exit context manager.""" 

3605 self.close() 

3606 

3607 def __eq__(self, other: object) -> bool: 

3608 """Check equality with another pack.""" 

3609 if not isinstance(other, Pack): 

3610 return False 

3611 return self.index == other.index 

3612 

3613 def __len__(self) -> int: 

3614 """Number of entries in this pack.""" 

3615 return len(self.index) 

3616 

3617 def __repr__(self) -> str: 

3618 """Return string representation of this pack.""" 

3619 return f"{self.__class__.__name__}({self._basename!r})" 

3620 

3621 def __iter__(self) -> Iterator[bytes]: 

3622 """Iterate over all the sha1s of the objects in this pack.""" 

3623 return iter(self.index) 

3624 

3625 def check_length_and_checksum(self) -> None: 

3626 """Sanity check the length and checksum of the pack index and data.""" 

3627 assert len(self.index) == len(self.data), ( 

3628 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

3629 ) 

3630 idx_stored_checksum = self.index.get_pack_checksum() 

3631 data_stored_checksum = self.data.get_stored_checksum() 

3632 if ( 

3633 idx_stored_checksum is not None 

3634 and idx_stored_checksum != data_stored_checksum 

3635 ): 

3636 raise ChecksumMismatch( 

3637 sha_to_hex(idx_stored_checksum), 

3638 sha_to_hex(data_stored_checksum), 

3639 ) 

3640 

3641 def check(self) -> None: 

3642 """Check the integrity of this pack. 

3643 

3644 Raises: 

3645 ChecksumMismatch: if a checksum for the index or data is wrong 

3646 """ 

3647 self.index.check() 

3648 self.data.check() 

3649 for obj in self.iterobjects(): 

3650 obj.check() 

3651 # TODO: object connectivity checks 

3652 

3653 def get_stored_checksum(self) -> bytes: 

3654 """Return the stored checksum of the pack data.""" 

3655 return self.data.get_stored_checksum() 

3656 

3657 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

3658 """Return pack tuples for all objects in pack.""" 

3659 return [(o, None) for o in self.iterobjects()] 

3660 

3661 def __contains__(self, sha1: bytes) -> bool: 

3662 """Check whether this pack contains a particular SHA1.""" 

3663 try: 

3664 self.index.object_offset(sha1) 

3665 return True 

3666 except KeyError: 

3667 return False 

3668 

3669 def get_raw(self, sha1: bytes) -> tuple[int, bytes]: 

3670 """Get raw object data by SHA1.""" 

3671 offset = self.index.object_offset(sha1) 

3672 obj_type, obj = self.data.get_object_at(offset) 

3673 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

3674 return type_num, b"".join(chunks) # type: ignore[arg-type] 

3675 

3676 def __getitem__(self, sha1: bytes) -> ShaFile: 

3677 """Retrieve the specified SHA1.""" 

3678 type, uncomp = self.get_raw(sha1) 

3679 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

3680 

3681 def iterobjects(self) -> Iterator[ShaFile]: 

3682 """Iterate over the objects in this pack.""" 

3683 return iter( 

3684 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

3685 ) 

3686 

3687 def iterobjects_subset( 

3688 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

3689 ) -> Iterator[ShaFile]: 

3690 """Iterate over a subset of objects in this pack.""" 

3691 return ( 

3692 uo 

3693 for uo in PackInflater.for_pack_subset( 

3694 self, 

3695 shas, 

3696 allow_missing=allow_missing, 

3697 resolve_ext_ref=self.resolve_ext_ref, 

3698 ) 

3699 if uo.id in shas 

3700 ) 

3701 

3702 def iter_unpacked_subset( 

3703 self, 

3704 shas: Iterable[ObjectID], 

3705 *, 

3706 include_comp: bool = False, 

3707 allow_missing: bool = False, 

3708 convert_ofs_delta: bool = False, 

3709 ) -> Iterator[UnpackedObject]: 

3710 """Iterate over unpacked objects in subset.""" 

3711 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

3712 ofs: dict[int, bytes] = {} 

3713 todo = set(shas) 

3714 for unpacked in self.iter_unpacked(include_comp=include_comp): 

3715 sha = unpacked.sha() 

3716 if unpacked.offset is not None: 

3717 ofs[unpacked.offset] = sha 

3718 hexsha = sha_to_hex(sha) 

3719 if hexsha in todo: 

3720 if unpacked.pack_type_num == OFS_DELTA: 

3721 assert isinstance(unpacked.delta_base, int) 

3722 assert unpacked.offset is not None 

3723 base_offset = unpacked.offset - unpacked.delta_base 

3724 try: 

3725 unpacked.delta_base = ofs[base_offset] 

3726 except KeyError: 

3727 ofs_pending[base_offset].append(unpacked) 

3728 continue 

3729 else: 

3730 unpacked.pack_type_num = REF_DELTA 

3731 yield unpacked 

3732 todo.remove(hexsha) 

3733 if unpacked.offset is not None: 

3734 for child in ofs_pending.pop(unpacked.offset, []): 

3735 child.pack_type_num = REF_DELTA 

3736 child.delta_base = sha 

3737 yield child 

3738 assert not ofs_pending 

3739 if not allow_missing and todo: 

3740 raise UnresolvedDeltas(list(todo)) 

3741 

3742 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

3743 """Iterate over all unpacked objects in this pack.""" 

3744 ofs_to_entries = { 

3745 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

3746 } 

3747 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

3748 assert unpacked.offset is not None 

3749 (sha, crc32) = ofs_to_entries[unpacked.offset] 

3750 unpacked._sha = sha 

3751 unpacked.crc32 = crc32 

3752 yield unpacked 

3753 

3754 def keep(self, msg: bytes | None = None) -> str: 

3755 """Add a .keep file for the pack, preventing git from garbage collecting it. 

3756 

3757 Args: 

3758 msg: A message written inside the .keep file; can be used later 

3759 to determine whether or not a .keep file is obsolete. 

3760 Returns: The path of the .keep file, as a string. 

3761 """ 

3762 keepfile_name = f"{self._basename}.keep" 

3763 with GitFile(keepfile_name, "wb") as keepfile: 

3764 if msg: 

3765 keepfile.write(msg) 

3766 keepfile.write(b"\n") 

3767 return keepfile_name 

3768 

3769 def get_ref(self, sha: bytes) -> tuple[int | None, int, OldUnpackedObject]: 

3770 """Get the object for a ref SHA, only looking in this pack.""" 

3771 # TODO: cache these results 

3772 try: 

3773 offset = self.index.object_offset(sha) 

3774 except KeyError: 

3775 offset = None 

3776 if offset: 

3777 type, obj = self.data.get_object_at(offset) 

3778 elif self.resolve_ext_ref: 

3779 type, obj = self.resolve_ext_ref(sha) 

3780 else: 

3781 raise KeyError(sha) 

3782 return offset, type, obj 

3783 

3784 def resolve_object( 

3785 self, 

3786 offset: int, 

3787 type: int, 

3788 obj: OldUnpackedObject, 

3789 get_ref: Callable[[bytes], tuple[int | None, int, OldUnpackedObject]] 

3790 | None = None, 

3791 ) -> tuple[int, OldUnpackedObject]: 

3792 """Resolve an object, possibly resolving deltas when necessary. 

3793 

3794 Returns: Tuple with object type and contents. 

3795 """ 

3796 # Walk down the delta chain, building a stack of deltas to reach 

3797 # the requested object. 

3798 base_offset = offset 

3799 base_type = type 

3800 base_obj = obj 

3801 delta_stack = [] 

3802 while base_type in DELTA_TYPES: 

3803 prev_offset = base_offset 

3804 if get_ref is None: 

3805 get_ref = self.get_ref 

3806 if base_type == OFS_DELTA: 

3807 (delta_offset, delta) = base_obj 

3808 # TODO: clean up asserts and replace with nicer error messages 

3809 assert isinstance(delta_offset, int), ( 

3810 f"Expected int, got {delta_offset.__class__}" 

3811 ) 

3812 base_offset = base_offset - delta_offset 

3813 base_type, base_obj = self.data.get_object_at(base_offset) 

3814 assert isinstance(base_type, int) 

3815 elif base_type == REF_DELTA: 

3816 (basename, delta) = base_obj 

3817 assert isinstance(basename, bytes) and len(basename) == 20 

3818 base_offset, base_type, base_obj = get_ref(basename) # type: ignore[assignment] 

3819 assert isinstance(base_type, int) 

3820 if base_offset == prev_offset: # object is based on itself 

3821 raise UnresolvedDeltas([basename]) 

3822 delta_stack.append((prev_offset, base_type, delta)) 

3823 

3824 # Now grab the base object (mustn't be a delta) and apply the 

3825 # deltas all the way up the stack. 

3826 chunks = base_obj 

3827 for prev_offset, _delta_type, delta in reversed(delta_stack): 

3828 # Convert chunks to bytes for apply_delta if needed 

3829 if isinstance(chunks, list): 

3830 chunks_bytes = b"".join(chunks) 

3831 elif isinstance(chunks, tuple): 

3832 # For tuple type, second element is the actual data 

3833 _, chunk_data = chunks 

3834 if isinstance(chunk_data, list): 

3835 chunks_bytes = b"".join(chunk_data) 

3836 else: 

3837 chunks_bytes = chunk_data 

3838 else: 

3839 chunks_bytes = chunks 

3840 

3841 # Apply delta and get result as list 

3842 chunks = apply_delta(chunks_bytes, delta) 

3843 

3844 if prev_offset is not None: 

3845 self.data._offset_cache[prev_offset] = base_type, chunks 

3846 return base_type, chunks 

3847 

3848 def entries( 

3849 self, progress: Callable[[int, int], None] | None = None 

3850 ) -> Iterator[PackIndexEntry]: 

3851 """Yield entries summarizing the contents of this pack. 

3852 

3853 Args: 

3854 progress: Progress function, called with current and total 

3855 object count. 

3856 Returns: iterator of tuples with (sha, offset, crc32) 

3857 """ 

3858 return self.data.iterentries( 

3859 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3860 ) 

3861 

3862 def sorted_entries( 

3863 self, progress: ProgressFn | None = None 

3864 ) -> Iterator[PackIndexEntry]: 

3865 """Return entries in this pack, sorted by SHA. 

3866 

3867 Args: 

3868 progress: Progress function, called with current and total 

3869 object count 

3870 Returns: Iterator of tuples with (sha, offset, crc32) 

3871 """ 

3872 return iter( 

3873 self.data.sorted_entries( 

3874 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3875 ) 

3876 ) 

3877 

3878 def get_unpacked_object( 

3879 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True 

3880 ) -> UnpackedObject: 

3881 """Get the unpacked object for a sha. 

3882 

3883 Args: 

3884 sha: SHA of object to fetch 

3885 include_comp: Whether to include compression data in UnpackedObject 

3886 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

3887 """ 

3888 offset = self.index.object_offset(sha) 

3889 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

3890 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

3891 assert isinstance(unpacked.delta_base, int) 

3892 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

3893 unpacked.pack_type_num = REF_DELTA 

3894 return unpacked 

3895 

3896 

3897def extend_pack( 

3898 f: BinaryIO, 

3899 object_ids: Set[ObjectID], 

3900 get_raw: Callable[[ObjectID], tuple[int, bytes]], 

3901 *, 

3902 compression_level: int = -1, 

3903 progress: Callable[[bytes], None] | None = None, 

3904) -> tuple[bytes, list[tuple[bytes, int, int]]]: 

3905 """Extend a pack file with more objects. 

3906 

3907 The caller should make sure that object_ids does not contain any objects 

3908 that are already in the pack 

3909 """ 

3910 # Update the header with the new number of objects. 

3911 f.seek(0) 

3912 _version, num_objects = read_pack_header(f.read) 

3913 

3914 if object_ids: 

3915 f.seek(0) 

3916 write_pack_header(f.write, num_objects + len(object_ids)) 

3917 

3918 # Must flush before reading (http://bugs.python.org/issue3207) 

3919 f.flush() 

3920 

3921 # Rescan the rest of the pack, computing the SHA with the new header. 

3922 new_sha = compute_file_sha(f, end_ofs=-20) 

3923 

3924 # Must reposition before writing (http://bugs.python.org/issue3207) 

3925 f.seek(0, os.SEEK_CUR) 

3926 

3927 extra_entries = [] 

3928 

3929 # Complete the pack. 

3930 for i, object_id in enumerate(object_ids): 

3931 if progress is not None: 

3932 progress( 

3933 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

3934 ) 

3935 assert len(object_id) == 20 

3936 type_num, data = get_raw(object_id) 

3937 offset = f.tell() 

3938 crc32 = write_pack_object( 

3939 f.write, 

3940 type_num, 

3941 [data], # Convert bytes to list[bytes] 

3942 sha=new_sha, 

3943 compression_level=compression_level, 

3944 ) 

3945 extra_entries.append((object_id, offset, crc32)) 

3946 pack_sha = new_sha.digest() 

3947 f.write(pack_sha) 

3948 return pack_sha, extra_entries 

3949 

3950 

3951try: 

3952 from dulwich._pack import ( # type: ignore 

3953 apply_delta, 

3954 bisect_find_sha, 

3955 ) 

3956except ImportError: 

3957 pass 

3958 

3959# Try to import the Rust version of create_delta 

3960try: 

3961 from dulwich._pack import create_delta as _create_delta_rs 

3962except ImportError: 

3963 pass 

3964else: 

3965 # Wrap the Rust version to match the Python API (returns bytes instead of Iterator) 

3966 def _create_delta_rs_wrapper(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3967 """Wrapper for Rust create_delta to match Python API.""" 

3968 yield _create_delta_rs(base_buf, target_buf) 

3969 

3970 create_delta = _create_delta_rs_wrapper