Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1845 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36__all__ = [ 

37 "DEFAULT_PACK_DELTA_WINDOW_SIZE", 

38 "DEFAULT_PACK_INDEX_VERSION", 

39 "DELTA_TYPES", 

40 "OFS_DELTA", 

41 "PACK_SPOOL_FILE_MAX_SIZE", 

42 "REF_DELTA", 

43 "DeltaChainIterator", 

44 "FilePackIndex", 

45 "MemoryPackIndex", 

46 "ObjectContainer", 

47 "Pack", 

48 "PackChunkGenerator", 

49 "PackData", 

50 "PackFileDisappeared", 

51 "PackHint", 

52 "PackIndex", 

53 "PackIndex1", 

54 "PackIndex2", 

55 "PackIndex3", 

56 "PackIndexEntry", 

57 "PackIndexer", 

58 "PackInflater", 

59 "PackStreamCopier", 

60 "PackStreamReader", 

61 "PackedObjectContainer", 

62 "SHA1Reader", 

63 "SHA1Writer", 

64 "UnpackedObject", 

65 "UnpackedObjectIterator", 

66 "UnpackedObjectStream", 

67 "UnresolvedDeltas", 

68 "apply_delta", 

69 "bisect_find_sha", 

70 "chunks_length", 

71 "compute_file_sha", 

72 "deltas_from_sorted_objects", 

73 "deltify_pack_objects", 

74 "extend_pack", 

75 "find_reusable_deltas", 

76 "full_unpacked_object", 

77 "generate_unpacked_objects", 

78 "iter_sha1", 

79 "load_pack_index", 

80 "load_pack_index_file", 

81 "obj_sha", 

82 "pack_header_chunks", 

83 "pack_object_chunks", 

84 "pack_object_header", 

85 "pack_objects_to_data", 

86 "read_pack_header", 

87 "read_zlib_chunks", 

88 "sort_objects_for_delta", 

89 "take_msb_bytes", 

90 "unpack_object", 

91 "verify_and_read", 

92 "write_pack", 

93 "write_pack_data", 

94 "write_pack_from_container", 

95 "write_pack_header", 

96 "write_pack_index", 

97 "write_pack_object", 

98 "write_pack_objects", 

99] 

100 

101import binascii 

102from collections import defaultdict, deque 

103from contextlib import suppress 

104from io import BytesIO, UnsupportedOperation 

105 

106try: 

107 from cdifflib import CSequenceMatcher as SequenceMatcher 

108except ModuleNotFoundError: 

109 from difflib import SequenceMatcher 

110 

111import os 

112import struct 

113import sys 

114import warnings 

115import zlib 

116from collections.abc import Callable, Iterable, Iterator, Sequence, Set 

117from hashlib import sha1, sha256 

118from itertools import chain 

119from os import SEEK_CUR, SEEK_END 

120from struct import unpack_from 

121from types import TracebackType 

122from typing import ( 

123 IO, 

124 TYPE_CHECKING, 

125 Any, 

126 BinaryIO, 

127 Generic, 

128 Protocol, 

129 TypeVar, 

130) 

131 

132if sys.version_info >= (3, 11): 

133 from typing import Self 

134else: 

135 from typing_extensions import Self 

136 

137try: 

138 import mmap 

139except ImportError: 

140 has_mmap = False 

141else: 

142 has_mmap = True 

143 

144if TYPE_CHECKING: 

145 from _hashlib import HASH as HashObject 

146 

147 from .bitmap import PackBitmap 

148 from .commit_graph import CommitGraph 

149 from .object_store import BaseObjectStore 

150 from .ref import Ref 

151 

152# For some reason the above try, except fails to set has_mmap = False for plan9 

153if sys.platform == "Plan9": 

154 has_mmap = False 

155 

156from .errors import ApplyDeltaError, ChecksumMismatch 

157from .file import GitFile, _GitFile 

158from .lru_cache import LRUSizeCache 

159from .object_format import OBJECT_FORMAT_TYPE_NUMS, SHA1, ObjectFormat 

160from .objects import ( 

161 ObjectID, 

162 RawObjectID, 

163 ShaFile, 

164 hex_to_sha, 

165 object_header, 

166 sha_to_hex, 

167) 

168 

169OFS_DELTA = 6 

170REF_DELTA = 7 

171 

172DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

173 

174 

175DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

176 

177# Keep pack files under 16Mb in memory, otherwise write them out to disk 

178PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

179 

180# Default pack index version to use when none is specified 

181DEFAULT_PACK_INDEX_VERSION = 2 

182 

183 

184OldUnpackedObject = tuple[bytes | int, list[bytes]] | list[bytes] | bytes 

185ResolveExtRefFn = Callable[[RawObjectID | ObjectID], tuple[int, bytes | list[bytes]]] 

186ProgressFn = Callable[[int, str], None] 

187PackHint = tuple[int, bytes | None] 

188 

189 

190def verify_and_read( 

191 read_func: Callable[[int], bytes], 

192 expected_hash: bytes, 

193 hash_algo: str, 

194 progress: Callable[[bytes], None] | None = None, 

195) -> Iterator[bytes]: 

196 """Read from stream, verify hash, then yield verified chunks. 

197 

198 This function downloads data to a temporary file (in-memory for small files, 

199 on-disk for large ones) while computing its hash. Only after the hash is 

200 verified to match expected_hash will it yield any data. This prevents 

201 corrupted or malicious data from reaching the caller. 

202 

203 Args: 

204 read_func: Function to read bytes (like file.read or HTTP response reader) 

205 expected_hash: Expected hash as hex string bytes (e.g., b'a3b2c1...') 

206 hash_algo: Hash algorithm name ('sha1' or 'sha256') 

207 progress: Optional progress callback 

208 

209 Yields: 

210 Chunks of verified data (only after hash verification succeeds) 

211 

212 Raises: 

213 ValueError: If hash doesn't match or algorithm unsupported 

214 """ 

215 from tempfile import SpooledTemporaryFile 

216 

217 from .object_format import OBJECT_FORMATS 

218 

219 # Get the hash function for this algorithm 

220 obj_format = OBJECT_FORMATS.get(hash_algo) 

221 if obj_format is None: 

222 raise ValueError(f"Unsupported hash algorithm: {hash_algo}") 

223 

224 hasher = obj_format.new_hash() 

225 

226 # Download to temporary file (memory or disk) while computing hash 

227 with SpooledTemporaryFile( 

228 max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix="dulwich-verify-" 

229 ) as temp_file: 

230 # Read data, hash it, and write to temp file 

231 while True: 

232 chunk = read_func(65536) # Read in 64KB chunks 

233 if not chunk: 

234 break 

235 hasher.update(chunk) 

236 temp_file.write(chunk) 

237 

238 # Verify hash BEFORE yielding any data 

239 computed_hash = hasher.hexdigest().encode("ascii") 

240 if computed_hash != expected_hash: 

241 raise ValueError( 

242 f"hash mismatch: expected {expected_hash.decode('ascii')}, " 

243 f"got {computed_hash.decode('ascii')}" 

244 ) 

245 

246 # Hash verified! Now read from temp file and yield chunks 

247 if progress: 

248 progress(b"Hash verified, processing data\n") 

249 

250 temp_file.seek(0) 

251 while True: 

252 chunk = temp_file.read(65536) 

253 if not chunk: 

254 break 

255 yield chunk 

256 

257 

258class UnresolvedDeltas(Exception): 

259 """Delta objects could not be resolved.""" 

260 

261 def __init__(self, shas: list[bytes]) -> None: 

262 """Initialize UnresolvedDeltas exception. 

263 

264 Args: 

265 shas: List of SHA hashes for unresolved delta objects 

266 """ 

267 self.shas = shas 

268 

269 

270class ObjectContainer(Protocol): 

271 """Protocol for objects that can contain git objects.""" 

272 

273 def add_object(self, obj: ShaFile) -> None: 

274 """Add a single object to this object store.""" 

275 

276 def add_objects( 

277 self, 

278 objects: Sequence[tuple[ShaFile, str | None]], 

279 progress: Callable[..., None] | None = None, 

280 ) -> "Pack | None": 

281 """Add a set of objects to this object store. 

282 

283 Args: 

284 objects: Iterable over a list of (object, path) tuples 

285 progress: Progress callback for object insertion 

286 Returns: Optional Pack object of the objects written. 

287 """ 

288 

289 def __contains__(self, sha1: "ObjectID") -> bool: 

290 """Check if a hex sha is present.""" 

291 

292 def __getitem__(self, sha1: "ObjectID | RawObjectID") -> ShaFile: 

293 """Retrieve an object.""" 

294 

295 def get_commit_graph(self) -> "CommitGraph | None": 

296 """Get the commit graph for this object store. 

297 

298 Returns: 

299 CommitGraph object if available, None otherwise 

300 """ 

301 return None 

302 

303 

304class PackedObjectContainer(ObjectContainer): 

305 """Container for objects packed in a pack file.""" 

306 

307 def get_unpacked_object( 

308 self, sha1: "ObjectID | RawObjectID", *, include_comp: bool = False 

309 ) -> "UnpackedObject": 

310 """Get a raw unresolved object. 

311 

312 Args: 

313 sha1: SHA-1 hash of the object 

314 include_comp: Whether to include compressed data 

315 

316 Returns: 

317 UnpackedObject instance 

318 """ 

319 raise NotImplementedError(self.get_unpacked_object) 

320 

321 def iterobjects_subset( 

322 self, shas: Iterable["ObjectID"], *, allow_missing: bool = False 

323 ) -> Iterator[ShaFile]: 

324 """Iterate over a subset of objects. 

325 

326 Args: 

327 shas: Iterable of object SHAs to retrieve 

328 allow_missing: If True, skip missing objects 

329 

330 Returns: 

331 Iterator of ShaFile objects 

332 """ 

333 raise NotImplementedError(self.iterobjects_subset) 

334 

335 def iter_unpacked_subset( 

336 self, 

337 shas: Iterable["ObjectID | RawObjectID"], 

338 *, 

339 include_comp: bool = False, 

340 allow_missing: bool = False, 

341 convert_ofs_delta: bool = True, 

342 ) -> Iterator["UnpackedObject"]: 

343 """Iterate over unpacked objects from a subset of SHAs. 

344 

345 Args: 

346 shas: Set of object SHAs to retrieve 

347 include_comp: Include compressed data if True 

348 allow_missing: If True, skip missing objects 

349 convert_ofs_delta: If True, convert offset deltas to ref deltas 

350 

351 Returns: 

352 Iterator of UnpackedObject instances 

353 """ 

354 raise NotImplementedError(self.iter_unpacked_subset) 

355 

356 

357class UnpackedObjectStream: 

358 """Abstract base class for a stream of unpacked objects.""" 

359 

360 def __iter__(self) -> Iterator["UnpackedObject"]: 

361 """Iterate over unpacked objects.""" 

362 raise NotImplementedError(self.__iter__) 

363 

364 def __len__(self) -> int: 

365 """Return the number of objects in the stream.""" 

366 raise NotImplementedError(self.__len__) 

367 

368 

369def take_msb_bytes( 

370 read: Callable[[int], bytes], crc32: int | None = None 

371) -> tuple[list[int], int | None]: 

372 """Read bytes marked with most significant bit. 

373 

374 Args: 

375 read: Read function 

376 crc32: Optional CRC32 checksum to update 

377 

378 Returns: 

379 Tuple of (list of bytes read, updated CRC32 or None) 

380 """ 

381 ret: list[int] = [] 

382 while len(ret) == 0 or ret[-1] & 0x80: 

383 b = read(1) 

384 if crc32 is not None: 

385 crc32 = binascii.crc32(b, crc32) 

386 ret.append(ord(b[:1])) 

387 return ret, crc32 

388 

389 

390class PackFileDisappeared(Exception): 

391 """Raised when a pack file unexpectedly disappears. 

392 

393 This typically happens when a concurrent operation (e.g. ``git repack`` 

394 or ``git gc --auto``) removes a pack file between the moment dulwich 

395 snapshots the pack directory and the moment it actually opens the 

396 pack's ``.idx`` or ``.pack`` file. 

397 

398 The ``obj`` attribute holds the :class:`Pack` (or :class:`FilePackIndex`) 

399 whose backing file vanished, so the caller can evict the stale object 

400 from its cache and rescan the pack directory. 

401 """ 

402 

403 obj: "Pack | FilePackIndex" 

404 

405 def __init__(self, obj: "Pack | FilePackIndex") -> None: 

406 """Initialize PackFileDisappeared exception. 

407 

408 Args: 

409 obj: The pack or pack index that disappeared. 

410 """ 

411 self.obj = obj 

412 

413 

414class UnpackedObject: 

415 """Class encapsulating an object unpacked from a pack file. 

416 

417 These objects should only be created from within unpack_object. Most 

418 members start out as empty and are filled in at various points by 

419 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

420 

421 End users of this object should take care that the function they're getting 

422 this object from is guaranteed to set the members they need. 

423 """ 

424 

425 __slots__ = [ 

426 "_sha", # Cached binary SHA. 

427 "comp_chunks", # Compressed object chunks. 

428 "crc32", # CRC32. 

429 "decomp_chunks", # Decompressed object chunks. 

430 "decomp_len", # Decompressed length of this object. 

431 "delta_base", # Delta base offset or SHA. 

432 "hash_func", # Hash function to use for computing object IDs. 

433 "obj_chunks", # Decompressed and delta-resolved chunks. 

434 "obj_type_num", # Type of this object. 

435 "offset", # Offset in its pack. 

436 "pack_type_num", # Type of this object in the pack (may be a delta). 

437 ] 

438 

439 obj_type_num: int | None 

440 obj_chunks: list[bytes] | None 

441 delta_base: None | bytes | int 

442 decomp_chunks: list[bytes] 

443 comp_chunks: list[bytes] | None 

444 decomp_len: int | None 

445 crc32: int | None 

446 offset: int | None 

447 pack_type_num: int 

448 _sha: bytes | None 

449 hash_func: Callable[[], "HashObject"] 

450 

451 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

452 # methods of this object. 

453 def __init__( 

454 self, 

455 pack_type_num: int, 

456 *, 

457 delta_base: None | bytes | int = None, 

458 decomp_len: int | None = None, 

459 crc32: int | None = None, 

460 sha: bytes | None = None, 

461 decomp_chunks: list[bytes] | None = None, 

462 offset: int | None = None, 

463 hash_func: Callable[[], "HashObject"] = sha1, 

464 ) -> None: 

465 """Initialize an UnpackedObject. 

466 

467 Args: 

468 pack_type_num: Type number of this object in the pack 

469 delta_base: Delta base (offset or SHA) if this is a delta object 

470 decomp_len: Decompressed length of this object 

471 crc32: CRC32 checksum 

472 sha: SHA hash of the object 

473 decomp_chunks: Decompressed chunks 

474 offset: Offset in the pack file 

475 hash_func: Hash function to use (defaults to sha1) 

476 """ 

477 self.offset = offset 

478 self._sha = sha 

479 self.pack_type_num = pack_type_num 

480 self.delta_base = delta_base 

481 self.comp_chunks = None 

482 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

483 if decomp_chunks is not None and decomp_len is None: 

484 self.decomp_len = sum(map(len, decomp_chunks)) 

485 else: 

486 self.decomp_len = decomp_len 

487 self.crc32 = crc32 

488 self.hash_func = hash_func 

489 

490 if pack_type_num in DELTA_TYPES: 

491 self.obj_type_num = None 

492 self.obj_chunks = None 

493 else: 

494 self.obj_type_num = pack_type_num 

495 self.obj_chunks = self.decomp_chunks 

496 self.delta_base = delta_base 

497 

498 def sha(self) -> RawObjectID: 

499 """Return the binary SHA of this object.""" 

500 if self._sha is None: 

501 assert self.obj_type_num is not None and self.obj_chunks is not None 

502 self._sha = obj_sha(self.obj_type_num, self.obj_chunks, self.hash_func) 

503 return RawObjectID(self._sha) 

504 

505 def sha_file(self) -> ShaFile: 

506 """Return a ShaFile from this object.""" 

507 assert self.obj_type_num is not None and self.obj_chunks is not None 

508 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

509 

510 # Only provided for backwards compatibility with code that expects either 

511 # chunks or a delta tuple. 

512 def _obj(self) -> OldUnpackedObject: 

513 """Return the decompressed chunks, or (delta base, delta chunks).""" 

514 if self.pack_type_num in DELTA_TYPES: 

515 assert isinstance(self.delta_base, bytes | int) 

516 return (self.delta_base, self.decomp_chunks) 

517 else: 

518 return self.decomp_chunks 

519 

520 def __eq__(self, other: object) -> bool: 

521 """Check equality with another UnpackedObject.""" 

522 if not isinstance(other, UnpackedObject): 

523 return False 

524 for slot in self.__slots__: 

525 if getattr(self, slot) != getattr(other, slot): 

526 return False 

527 return True 

528 

529 def __ne__(self, other: object) -> bool: 

530 """Check inequality with another UnpackedObject.""" 

531 return not (self == other) 

532 

533 def __repr__(self) -> str: 

534 """Return string representation of this UnpackedObject.""" 

535 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

536 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

537 

538 

539_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

540 

541# Default maximum memory for caching delta base objects (matches Git's default 

542# for core.deltaBaseCacheLimit). 

543DEFAULT_DELTA_BASE_CACHE_LIMIT = 96 * 1024 * 1024 # 96 MiB 

544 

545 

546def read_zlib_chunks( 

547 read_some: Callable[[int], bytes], 

548 unpacked: UnpackedObject, 

549 include_comp: bool = False, 

550 buffer_size: int = _ZLIB_BUFSIZE, 

551) -> bytes: 

552 """Read zlib data from a buffer. 

553 

554 This function requires that the buffer have additional data following the 

555 compressed data, which is guaranteed to be the case for git pack files. 

556 

557 Args: 

558 read_some: Read function that returns at least one byte, but may 

559 return less than the requested size. 

560 unpacked: An UnpackedObject to write result data to. If its crc32 

561 attr is not None, the CRC32 of the compressed bytes will be computed 

562 using this starting CRC32. 

563 After this function, will have the following attrs set: 

564 * comp_chunks (if include_comp is True) 

565 * decomp_chunks 

566 * decomp_len 

567 * crc32 

568 include_comp: If True, include compressed data in the result. 

569 buffer_size: Size of the read buffer. 

570 Returns: Leftover unused data from the decompression. 

571 

572 Raises: 

573 zlib.error: if a decompression error occurred. 

574 """ 

575 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

576 raise ValueError("non-negative zlib data stream size expected") 

577 decomp_obj = zlib.decompressobj() 

578 

579 comp_chunks = [] 

580 decomp_chunks = unpacked.decomp_chunks 

581 decomp_len = 0 

582 crc32 = unpacked.crc32 

583 

584 while True: 

585 add = read_some(buffer_size) 

586 if not add: 

587 raise zlib.error("EOF before end of zlib stream") 

588 comp_chunks.append(add) 

589 decomp = decomp_obj.decompress(add) 

590 decomp_len += len(decomp) 

591 decomp_chunks.append(decomp) 

592 unused = decomp_obj.unused_data 

593 if unused: 

594 left = len(unused) 

595 if crc32 is not None: 

596 crc32 = binascii.crc32(add[:-left], crc32) 

597 if include_comp: 

598 comp_chunks[-1] = add[:-left] 

599 break 

600 elif crc32 is not None: 

601 crc32 = binascii.crc32(add, crc32) 

602 if crc32 is not None: 

603 crc32 &= 0xFFFFFFFF 

604 

605 if decomp_len != unpacked.decomp_len: 

606 raise zlib.error("decompressed data does not match expected size") 

607 

608 unpacked.crc32 = crc32 

609 if include_comp: 

610 unpacked.comp_chunks = comp_chunks 

611 return unused 

612 

613 

614def iter_sha1(iter: Iterable[bytes]) -> bytes: 

615 """Return the hexdigest of the SHA1 over a set of names. 

616 

617 Args: 

618 iter: Iterator over string objects 

619 Returns: 40-byte hex sha1 digest 

620 """ 

621 sha = sha1() 

622 for name in iter: 

623 sha.update(name) 

624 return sha.hexdigest().encode("ascii") 

625 

626 

627def load_pack_index( 

628 path: str | os.PathLike[str], object_format: ObjectFormat 

629) -> "PackIndex": 

630 """Load an index file by path. 

631 

632 Args: 

633 path: Path to the index file 

634 object_format: Hash algorithm used by the repository 

635 Returns: A PackIndex loaded from the given path 

636 """ 

637 with GitFile(path, "rb") as f: 

638 return load_pack_index_file(path, f, object_format) 

639 

640 

641def _load_file_contents( 

642 f: IO[bytes] | _GitFile, size: int | None = None 

643) -> tuple[bytes | Any, int]: 

644 """Load contents from a file, preferring mmap when possible. 

645 

646 Args: 

647 f: File-like object to load 

648 size: Expected size, or None to determine from file 

649 Returns: Tuple of (contents, size) 

650 """ 

651 try: 

652 fd = f.fileno() 

653 except (UnsupportedOperation, AttributeError): 

654 fd = None 

655 # Attempt to use mmap if possible 

656 if fd is not None: 

657 if size is None: 

658 size = os.fstat(fd).st_size 

659 if has_mmap: 

660 try: 

661 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

662 except (OSError, ValueError): 

663 # Can't mmap - perhaps a socket or invalid file descriptor 

664 pass 

665 else: 

666 return contents, size 

667 contents_bytes = f.read() 

668 size = len(contents_bytes) 

669 return contents_bytes, size 

670 

671 

672def load_pack_index_file( 

673 path: str | os.PathLike[str], 

674 f: IO[bytes] | _GitFile, 

675 object_format: ObjectFormat, 

676) -> "PackIndex": 

677 """Load an index file from a file-like object. 

678 

679 Args: 

680 path: Path for the index file 

681 f: File-like object 

682 object_format: Hash algorithm used by the repository 

683 Returns: A PackIndex loaded from the given file 

684 """ 

685 contents, size = _load_file_contents(f) 

686 if contents[:4] == b"\377tOc": 

687 version = struct.unpack(b">L", contents[4:8])[0] 

688 if version == 2: 

689 return PackIndex2( 

690 path, 

691 object_format, 

692 file=f, 

693 contents=contents, 

694 size=size, 

695 ) 

696 elif version == 3: 

697 return PackIndex3(path, object_format, file=f, contents=contents, size=size) 

698 else: 

699 raise KeyError(f"Unknown pack index format {version}") 

700 else: 

701 return PackIndex1(path, object_format, file=f, contents=contents, size=size) 

702 

703 

704def bisect_find_sha( 

705 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

706) -> int | None: 

707 """Find a SHA in a data blob with sorted SHAs. 

708 

709 Args: 

710 start: Start index of range to search 

711 end: End index of range to search 

712 sha: Sha to find 

713 unpack_name: Callback to retrieve SHA by index 

714 Returns: Index of the SHA, or None if it wasn't found 

715 """ 

716 assert start <= end 

717 while start <= end: 

718 i = (start + end) // 2 

719 file_sha = unpack_name(i) 

720 if file_sha < sha: 

721 start = i + 1 

722 elif file_sha > sha: 

723 end = i - 1 

724 else: 

725 return i 

726 return None 

727 

728 

729PackIndexEntry = tuple[RawObjectID, int, int | None] 

730 

731 

732class PackIndex: 

733 """An index in to a packfile. 

734 

735 Given a sha id of an object a pack index can tell you the location in the 

736 packfile of that object if it has it. 

737 """ 

738 

739 object_format: "ObjectFormat" 

740 

741 def __eq__(self, other: object) -> bool: 

742 """Check equality with another PackIndex.""" 

743 if not isinstance(other, PackIndex): 

744 return False 

745 

746 for (name1, _, _), (name2, _, _) in zip( 

747 self.iterentries(), other.iterentries() 

748 ): 

749 if name1 != name2: 

750 return False 

751 return True 

752 

753 def __ne__(self, other: object) -> bool: 

754 """Check if this pack index is not equal to another.""" 

755 return not self.__eq__(other) 

756 

757 def __len__(self) -> int: 

758 """Return the number of entries in this pack index.""" 

759 raise NotImplementedError(self.__len__) 

760 

761 def __iter__(self) -> Iterator[ObjectID]: 

762 """Iterate over the SHAs in this pack.""" 

763 return map(lambda sha: sha_to_hex(RawObjectID(sha)), self._itersha()) 

764 

765 def iterentries(self) -> Iterator[PackIndexEntry]: 

766 """Iterate over the entries in this pack index. 

767 

768 Returns: iterator over tuples with object name, offset in packfile and 

769 crc32 checksum. 

770 """ 

771 raise NotImplementedError(self.iterentries) 

772 

773 def get_pack_checksum(self) -> bytes | None: 

774 """Return the SHA1 checksum stored for the corresponding packfile. 

775 

776 Returns: 20-byte binary digest, or None if not available 

777 """ 

778 raise NotImplementedError(self.get_pack_checksum) 

779 

780 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

781 """Return the offset in to the corresponding packfile for the object. 

782 

783 Given the name of an object it will return the offset that object 

784 lives at within the corresponding pack file. If the pack file doesn't 

785 have the object then None will be returned. 

786 """ 

787 raise NotImplementedError(self.object_offset) 

788 

789 def object_sha1(self, index: int) -> bytes: 

790 """Return the SHA1 corresponding to the index in the pack file.""" 

791 for name, offset, _crc32 in self.iterentries(): 

792 if offset == index: 

793 return name 

794 else: 

795 raise KeyError(index) 

796 

797 def _object_offset(self, sha: bytes) -> int: 

798 """See object_offset. 

799 

800 Args: 

801 sha: A *binary* SHA string. (20 characters long)_ 

802 """ 

803 raise NotImplementedError(self._object_offset) 

804 

805 def objects_sha1(self) -> bytes: 

806 """Return the hex SHA1 over all the shas of all objects in this pack. 

807 

808 Note: This is used for the filename of the pack. 

809 """ 

810 return iter_sha1(self._itersha()) 

811 

812 def _itersha(self) -> Iterator[bytes]: 

813 """Yield all the SHA1's of the objects in the index, sorted.""" 

814 raise NotImplementedError(self._itersha) 

815 

816 def iter_prefix(self, prefix: bytes) -> Iterator[RawObjectID]: 

817 """Iterate over all SHA1s with the given prefix. 

818 

819 Args: 

820 prefix: Binary prefix to match 

821 Returns: Iterator of matching SHA1s 

822 """ 

823 # Default implementation for PackIndex classes that don't override 

824 for sha, _, _ in self.iterentries(): 

825 if sha.startswith(prefix): 

826 yield RawObjectID(sha) 

827 

828 def close(self) -> None: 

829 """Close any open files.""" 

830 

831 def check(self) -> None: 

832 """Check the consistency of this pack index.""" 

833 

834 

835class MemoryPackIndex(PackIndex): 

836 """Pack index that is stored entirely in memory.""" 

837 

838 def __init__( 

839 self, 

840 entries: list[PackIndexEntry], 

841 object_format: ObjectFormat, 

842 pack_checksum: bytes | None = None, 

843 ) -> None: 

844 """Create a new MemoryPackIndex. 

845 

846 Args: 

847 entries: Sequence of name, idx, crc32 (sorted) 

848 object_format: Object format used by this index 

849 pack_checksum: Optional pack checksum 

850 """ 

851 self._by_sha = {} 

852 self._by_offset = {} 

853 for name, offset, _crc32 in entries: 

854 self._by_sha[name] = offset 

855 self._by_offset[offset] = name 

856 self._entries = entries 

857 self._pack_checksum = pack_checksum 

858 self.object_format = object_format 

859 

860 def get_pack_checksum(self) -> bytes | None: 

861 """Return the SHA checksum stored for the corresponding packfile.""" 

862 return self._pack_checksum 

863 

864 def __len__(self) -> int: 

865 """Return the number of entries in this pack index.""" 

866 return len(self._entries) 

867 

868 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

869 """Return the offset for the given SHA. 

870 

871 Args: 

872 sha: SHA to look up (binary or hex) 

873 Returns: Offset in the pack file 

874 """ 

875 lookup_sha: RawObjectID 

876 if len(sha) == self.object_format.hex_length: 

877 lookup_sha = hex_to_sha(ObjectID(sha)) 

878 else: 

879 lookup_sha = RawObjectID(sha) 

880 return self._by_sha[lookup_sha] 

881 

882 def object_sha1(self, offset: int) -> bytes: 

883 """Return the SHA1 for the object at the given offset.""" 

884 return self._by_offset[offset] 

885 

886 def _itersha(self) -> Iterator[bytes]: 

887 """Iterate over all SHA1s in the index.""" 

888 return iter(self._by_sha) 

889 

890 def iterentries(self) -> Iterator[PackIndexEntry]: 

891 """Iterate over all index entries.""" 

892 return iter(self._entries) 

893 

894 @classmethod 

895 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

896 """Create a MemoryPackIndex from a PackData object.""" 

897 return MemoryPackIndex( 

898 list(pack_data.sorted_entries()), 

899 pack_checksum=pack_data.get_stored_checksum(), 

900 object_format=pack_data.object_format, 

901 ) 

902 

903 @classmethod 

904 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

905 """Create a copy of another PackIndex in memory.""" 

906 return cls( 

907 list(other_index.iterentries()), 

908 other_index.object_format, 

909 other_index.get_pack_checksum(), 

910 ) 

911 

912 

913class FilePackIndex(PackIndex): 

914 """Pack index that is based on a file. 

915 

916 To do the loop it opens the file, and indexes first 256 4 byte groups 

917 with the first byte of the sha id. The value in the four byte group indexed 

918 is the end of the group that shares the same starting byte. Subtract one 

919 from the starting byte and index again to find the start of the group. 

920 The values are sorted by sha id within the group, so do the math to find 

921 the start and end offset and then bisect in to find if the value is 

922 present. 

923 """ 

924 

925 _fan_out_table: list[int] 

926 _file: IO[bytes] | _GitFile 

927 

928 def __init__( 

929 self, 

930 filename: str | os.PathLike[str], 

931 file: IO[bytes] | _GitFile | None = None, 

932 contents: "bytes | mmap.mmap | None" = None, 

933 size: int | None = None, 

934 ) -> None: 

935 """Create a pack index object. 

936 

937 Provide it with the name of the index file to consider, and it will map 

938 it whenever required. 

939 """ 

940 self._filename = filename 

941 # Take the size now, so it can be checked each time we map the file to 

942 # ensure that it hasn't changed. 

943 if file is None: 

944 self._file = GitFile(filename, "rb") 

945 else: 

946 self._file = file 

947 if contents is None: 

948 self._contents, self._size = _load_file_contents(self._file, size) 

949 else: 

950 self._contents = contents 

951 self._size = size if size is not None else len(contents) 

952 

953 @property 

954 def path(self) -> str: 

955 """Return the path to this index file.""" 

956 return os.fspath(self._filename) 

957 

958 def __eq__(self, other: object) -> bool: 

959 """Check equality with another FilePackIndex.""" 

960 # Quick optimization: 

961 if ( 

962 isinstance(other, FilePackIndex) 

963 and self._fan_out_table != other._fan_out_table 

964 ): 

965 return False 

966 

967 return super().__eq__(other) 

968 

969 def close(self) -> None: 

970 """Close the underlying file and any mmap.""" 

971 self._file.close() 

972 close_fn = getattr(self._contents, "close", None) 

973 if close_fn is not None: 

974 close_fn() 

975 

976 def __len__(self) -> int: 

977 """Return the number of entries in this pack index.""" 

978 return self._fan_out_table[-1] 

979 

980 def _unpack_entry(self, i: int) -> PackIndexEntry: 

981 """Unpack the i-th entry in the index file. 

982 

983 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

984 checksum (if known). 

985 """ 

986 raise NotImplementedError(self._unpack_entry) 

987 

988 def _unpack_name(self, i: int) -> bytes: 

989 """Unpack the i-th name from the index file.""" 

990 raise NotImplementedError(self._unpack_name) 

991 

992 def _unpack_offset(self, i: int) -> int: 

993 """Unpack the i-th object offset from the index file.""" 

994 raise NotImplementedError(self._unpack_offset) 

995 

996 def _unpack_crc32_checksum(self, i: int) -> int | None: 

997 """Unpack the crc32 checksum for the ith object from the index file.""" 

998 raise NotImplementedError(self._unpack_crc32_checksum) 

999 

1000 def _itersha(self) -> Iterator[bytes]: 

1001 """Iterate over all SHA1s in the index.""" 

1002 for i in range(len(self)): 

1003 yield self._unpack_name(i) 

1004 

1005 def iterentries(self) -> Iterator[PackIndexEntry]: 

1006 """Iterate over the entries in this pack index. 

1007 

1008 Returns: iterator over tuples with object name, offset in packfile and 

1009 crc32 checksum. 

1010 """ 

1011 for i in range(len(self)): 

1012 yield self._unpack_entry(i) 

1013 

1014 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

1015 """Read the fan-out table from the index. 

1016 

1017 The fan-out table contains 256 entries mapping first byte values 

1018 to the number of objects with SHA1s less than or equal to that byte. 

1019 

1020 Args: 

1021 start_offset: Offset in the file where the fan-out table starts 

1022 Returns: List of 256 integers 

1023 """ 

1024 ret = [] 

1025 for i in range(0x100): 

1026 fanout_entry = self._contents[ 

1027 start_offset + i * 4 : start_offset + (i + 1) * 4 

1028 ] 

1029 ret.append(struct.unpack(">L", fanout_entry)[0]) 

1030 return ret 

1031 

1032 def check(self) -> None: 

1033 """Check that the stored checksum matches the actual checksum.""" 

1034 actual = self.calculate_checksum() 

1035 stored = self.get_stored_checksum() 

1036 if actual != stored: 

1037 raise ChecksumMismatch(stored, actual) 

1038 

1039 def calculate_checksum(self) -> bytes: 

1040 """Calculate the SHA1 checksum over this pack index. 

1041 

1042 Returns: This is a 20-byte binary digest 

1043 """ 

1044 return sha1(self._contents[:-20]).digest() 

1045 

1046 def get_pack_checksum(self) -> bytes: 

1047 """Return the SHA1 checksum stored for the corresponding packfile. 

1048 

1049 Returns: 20-byte binary digest 

1050 """ 

1051 return bytes(self._contents[-40:-20]) 

1052 

1053 def get_stored_checksum(self) -> bytes: 

1054 """Return the SHA1 checksum stored for this index. 

1055 

1056 Returns: 20-byte binary digest 

1057 """ 

1058 return bytes(self._contents[-20:]) 

1059 

1060 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

1061 """Return the offset in to the corresponding packfile for the object. 

1062 

1063 Given the name of an object it will return the offset that object 

1064 lives at within the corresponding pack file. If the pack file doesn't 

1065 have the object then None will be returned. 

1066 """ 

1067 lookup_sha: RawObjectID 

1068 if len(sha) == self.object_format.hex_length: # hex string 

1069 lookup_sha = hex_to_sha(ObjectID(sha)) 

1070 else: 

1071 lookup_sha = RawObjectID(sha) 

1072 try: 

1073 return self._object_offset(lookup_sha) 

1074 except ValueError as exc: 

1075 closed = getattr(self._contents, "closed", None) 

1076 if closed in (None, True): 

1077 raise PackFileDisappeared(self) from exc 

1078 raise 

1079 

1080 def _object_offset(self, sha: bytes) -> int: 

1081 """See object_offset. 

1082 

1083 Args: 

1084 sha: A *binary* SHA string. (20 characters long)_ 

1085 """ 

1086 hash_size = getattr(self, "hash_size", 20) # Default to SHA1 for v1 

1087 assert len(sha) == hash_size 

1088 idx = ord(sha[:1]) 

1089 if idx == 0: 

1090 start = 0 

1091 else: 

1092 start = self._fan_out_table[idx - 1] 

1093 end = self._fan_out_table[idx] 

1094 i = bisect_find_sha(start, end, sha, self._unpack_name) 

1095 if i is None: 

1096 raise KeyError(sha) 

1097 return self._unpack_offset(i) 

1098 

1099 def iter_prefix(self, prefix: bytes) -> Iterator[RawObjectID]: 

1100 """Iterate over all SHA1s with the given prefix.""" 

1101 start = ord(prefix[:1]) 

1102 if start == 0: 

1103 start = 0 

1104 else: 

1105 start = self._fan_out_table[start - 1] 

1106 end = ord(prefix[:1]) + 1 

1107 if end == 0x100: 

1108 end = len(self) 

1109 else: 

1110 end = self._fan_out_table[end] 

1111 assert start <= end 

1112 started = False 

1113 for i in range(start, end): 

1114 name: bytes = self._unpack_name(i) 

1115 if name.startswith(prefix): 

1116 yield RawObjectID(name) 

1117 started = True 

1118 elif started: 

1119 break 

1120 

1121 

1122class PackIndex1(FilePackIndex): 

1123 """Version 1 Pack Index file.""" 

1124 

1125 object_format = SHA1 

1126 

1127 def __init__( 

1128 self, 

1129 filename: str | os.PathLike[str], 

1130 object_format: ObjectFormat, 

1131 file: IO[bytes] | _GitFile | None = None, 

1132 contents: bytes | None = None, 

1133 size: int | None = None, 

1134 ) -> None: 

1135 """Initialize a version 1 pack index. 

1136 

1137 Args: 

1138 filename: Path to the index file 

1139 object_format: Object format used by the repository 

1140 file: Optional file object 

1141 contents: Optional mmap'd contents 

1142 size: Optional size of the index 

1143 """ 

1144 super().__init__(filename, file, contents, size) 

1145 

1146 # PackIndex1 only supports SHA1 

1147 if object_format != SHA1: 

1148 raise AssertionError( 

1149 f"PackIndex1 only supports SHA1, not {object_format.name}" 

1150 ) 

1151 

1152 self.object_format = object_format 

1153 self.version = 1 

1154 self._fan_out_table = self._read_fan_out_table(0) 

1155 self.hash_size = self.object_format.oid_length 

1156 self._entry_size = 4 + self.hash_size 

1157 

1158 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, None]: 

1159 base_offset = (0x100 * 4) + (i * self._entry_size) 

1160 offset = unpack_from(">L", self._contents, base_offset)[0] 

1161 name = self._contents[base_offset + 4 : base_offset + 4 + self.hash_size] 

1162 return (RawObjectID(name), offset, None) 

1163 

1164 def _unpack_name(self, i: int) -> bytes: 

1165 offset = (0x100 * 4) + (i * self._entry_size) + 4 

1166 return self._contents[offset : offset + self.hash_size] 

1167 

1168 def _unpack_offset(self, i: int) -> int: 

1169 offset = (0x100 * 4) + (i * self._entry_size) 

1170 return int(unpack_from(">L", self._contents, offset)[0]) 

1171 

1172 def _unpack_crc32_checksum(self, i: int) -> None: 

1173 # Not stored in v1 index files 

1174 return None 

1175 

1176 

1177class PackIndex2(FilePackIndex): 

1178 """Version 2 Pack Index file.""" 

1179 

1180 object_format = SHA1 

1181 

1182 def __init__( 

1183 self, 

1184 filename: str | os.PathLike[str], 

1185 object_format: ObjectFormat, 

1186 file: IO[bytes] | _GitFile | None = None, 

1187 contents: bytes | None = None, 

1188 size: int | None = None, 

1189 ) -> None: 

1190 """Initialize a version 2 pack index. 

1191 

1192 Args: 

1193 filename: Path to the index file 

1194 object_format: Object format used by the repository 

1195 file: Optional file object 

1196 contents: Optional mmap'd contents 

1197 size: Optional size of the index 

1198 """ 

1199 super().__init__(filename, file, contents, size) 

1200 self.object_format = object_format 

1201 if self._contents[:4] != b"\377tOc": 

1202 raise AssertionError("Not a v2 pack index file") 

1203 (self.version,) = unpack_from(b">L", self._contents, 4) 

1204 if self.version != 2: 

1205 raise AssertionError(f"Version was {self.version}") 

1206 self._fan_out_table = self._read_fan_out_table(8) 

1207 self.hash_size = self.object_format.oid_length 

1208 self._name_table_offset = 8 + 0x100 * 4 

1209 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1210 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1211 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1212 self 

1213 ) 

1214 

1215 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, int]: 

1216 return ( 

1217 RawObjectID(self._unpack_name(i)), 

1218 self._unpack_offset(i), 

1219 self._unpack_crc32_checksum(i), 

1220 ) 

1221 

1222 def _unpack_name(self, i: int) -> bytes: 

1223 offset = self._name_table_offset + i * self.hash_size 

1224 return self._contents[offset : offset + self.hash_size] 

1225 

1226 def _unpack_offset(self, i: int) -> int: 

1227 offset = self._pack_offset_table_offset + i * 4 

1228 offset_val = int(unpack_from(">L", self._contents, offset)[0]) 

1229 if offset_val & (2**31): 

1230 offset = ( 

1231 self._pack_offset_largetable_offset + (offset_val & (2**31 - 1)) * 8 

1232 ) 

1233 offset_val = int(unpack_from(">Q", self._contents, offset)[0]) 

1234 return offset_val 

1235 

1236 def _unpack_crc32_checksum(self, i: int) -> int: 

1237 return int( 

1238 unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1239 ) 

1240 

1241 def get_pack_checksum(self) -> bytes: 

1242 """Return the checksum stored for the corresponding packfile. 

1243 

1244 Returns: binary digest (size depends on hash algorithm) 

1245 """ 

1246 # Index ends with: pack_checksum + index_checksum 

1247 # Each checksum is hash_size bytes 

1248 checksum_size = self.hash_size 

1249 return bytes(self._contents[-2 * checksum_size : -checksum_size]) 

1250 

1251 def get_stored_checksum(self) -> bytes: 

1252 """Return the checksum stored for this index. 

1253 

1254 Returns: binary digest (size depends on hash algorithm) 

1255 """ 

1256 checksum_size = self.hash_size 

1257 return bytes(self._contents[-checksum_size:]) 

1258 

1259 def calculate_checksum(self) -> bytes: 

1260 """Calculate the checksum over this pack index. 

1261 

1262 Returns: binary digest (size depends on hash algorithm) 

1263 """ 

1264 # Determine hash function based on hash_size 

1265 if self.hash_size == 20: 

1266 hash_func = sha1 

1267 elif self.hash_size == 32: 

1268 hash_func = sha256 

1269 else: 

1270 raise ValueError(f"Unsupported hash size: {self.hash_size}") 

1271 

1272 return hash_func(self._contents[: -self.hash_size]).digest() 

1273 

1274 

1275class PackIndex3(FilePackIndex): 

1276 """Version 3 Pack Index file. 

1277 

1278 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1279 """ 

1280 

1281 def __init__( 

1282 self, 

1283 filename: str | os.PathLike[str], 

1284 object_format: ObjectFormat, 

1285 file: IO[bytes] | _GitFile | None = None, 

1286 contents: bytes | None = None, 

1287 size: int | None = None, 

1288 ) -> None: 

1289 """Initialize a version 3 pack index. 

1290 

1291 Args: 

1292 filename: Path to the index file 

1293 object_format: Object format used by the repository 

1294 file: Optional file object 

1295 contents: Optional mmap'd contents 

1296 size: Optional size of the index 

1297 """ 

1298 super().__init__(filename, file, contents, size) 

1299 if self._contents[:4] != b"\377tOc": 

1300 raise AssertionError("Not a v3 pack index file") 

1301 (self.version,) = unpack_from(b">L", self._contents, 4) 

1302 if self.version != 3: 

1303 raise AssertionError(f"Version was {self.version}") 

1304 

1305 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1306 (self.hash_format,) = unpack_from(b">L", self._contents, 8) 

1307 file_object_format = OBJECT_FORMAT_TYPE_NUMS[self.hash_format] 

1308 

1309 # Verify provided object_format matches what's in the file 

1310 if object_format != file_object_format: 

1311 raise AssertionError( 

1312 f"Object format mismatch: provided {object_format.name}, " 

1313 f"but file contains {file_object_format.name}" 

1314 ) 

1315 

1316 self.object_format = object_format 

1317 self.hash_size = self.object_format.oid_length 

1318 

1319 # Read length of shortened object names 

1320 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1321 

1322 # Calculate offsets based on variable hash size 

1323 self._fan_out_table = self._read_fan_out_table( 

1324 16 

1325 ) # After header (4 + 4 + 4 + 4) 

1326 self._name_table_offset = 16 + 0x100 * 4 

1327 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1328 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1329 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1330 self 

1331 ) 

1332 

1333 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, int]: 

1334 return ( 

1335 RawObjectID(self._unpack_name(i)), 

1336 self._unpack_offset(i), 

1337 self._unpack_crc32_checksum(i), 

1338 ) 

1339 

1340 def _unpack_name(self, i: int) -> bytes: 

1341 offset = self._name_table_offset + i * self.hash_size 

1342 return self._contents[offset : offset + self.hash_size] 

1343 

1344 def _unpack_offset(self, i: int) -> int: 

1345 offset_pos = self._pack_offset_table_offset + i * 4 

1346 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1347 assert isinstance(offset, int) 

1348 if offset & (2**31): 

1349 large_offset_pos = ( 

1350 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1351 ) 

1352 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1353 assert isinstance(offset, int) 

1354 return offset 

1355 

1356 def _unpack_crc32_checksum(self, i: int) -> int: 

1357 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1358 assert isinstance(result, int) 

1359 return result 

1360 

1361 

1362def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1363 """Read the header of a pack file. 

1364 

1365 Args: 

1366 read: Read function 

1367 Returns: Tuple of (pack version, number of objects). If no data is 

1368 available to read, returns (None, None). 

1369 """ 

1370 header = read(12) 

1371 if not header: 

1372 raise AssertionError("file too short to contain pack") 

1373 if header[:4] != b"PACK": 

1374 raise AssertionError(f"Invalid pack header {header!r}") 

1375 (version,) = unpack_from(b">L", header, 4) 

1376 if version not in (2, 3): 

1377 raise AssertionError(f"Version was {version}") 

1378 (num_objects,) = unpack_from(b">L", header, 8) 

1379 return (version, num_objects) 

1380 

1381 

1382def chunks_length(chunks: bytes | Iterable[bytes]) -> int: 

1383 """Get the total length of a sequence of chunks. 

1384 

1385 Args: 

1386 chunks: Either a single bytes object or an iterable of bytes 

1387 Returns: Total length in bytes 

1388 """ 

1389 if isinstance(chunks, bytes): 

1390 return len(chunks) 

1391 else: 

1392 return sum(map(len, chunks)) 

1393 

1394 

1395def unpack_object( 

1396 read_all: Callable[[int], bytes], 

1397 hash_func: Callable[[], "HashObject"], 

1398 read_some: Callable[[int], bytes] | None = None, 

1399 compute_crc32: bool = False, 

1400 include_comp: bool = False, 

1401 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1402) -> tuple[UnpackedObject, bytes]: 

1403 """Unpack a Git object. 

1404 

1405 Args: 

1406 read_all: Read function that blocks until the number of requested 

1407 bytes are read. 

1408 hash_func: Hash function to use for computing object IDs. 

1409 read_some: Read function that returns at least one byte, but may not 

1410 return the number of bytes requested. 

1411 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1412 False, the returned CRC32 will be None. 

1413 include_comp: If True, include compressed data in the result. 

1414 zlib_bufsize: An optional buffer size for zlib operations. 

1415 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1416 leftover from decompression, and unpacked in an UnpackedObject with 

1417 the following attrs set: 

1418 

1419 * obj_chunks (for non-delta types) 

1420 * pack_type_num 

1421 * delta_base (for delta types) 

1422 * comp_chunks (if include_comp is True) 

1423 * decomp_chunks 

1424 * decomp_len 

1425 * crc32 (if compute_crc32 is True) 

1426 """ 

1427 if read_some is None: 

1428 read_some = read_all 

1429 if compute_crc32: 

1430 crc32 = 0 

1431 else: 

1432 crc32 = None 

1433 

1434 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1435 type_num = (raw[0] >> 4) & 0x07 

1436 size = raw[0] & 0x0F 

1437 for i, byte in enumerate(raw[1:]): 

1438 size += (byte & 0x7F) << ((i * 7) + 4) 

1439 

1440 delta_base: int | bytes | None 

1441 raw_base = len(raw) 

1442 if type_num == OFS_DELTA: 

1443 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1444 raw_base += len(raw) 

1445 if raw[-1] & 0x80: 

1446 raise AssertionError 

1447 delta_base_offset = raw[0] & 0x7F 

1448 for byte in raw[1:]: 

1449 delta_base_offset += 1 

1450 delta_base_offset <<= 7 

1451 delta_base_offset += byte & 0x7F 

1452 delta_base = delta_base_offset 

1453 elif type_num == REF_DELTA: 

1454 # Determine hash size from hash_func 

1455 hash_size = len(hash_func().digest()) 

1456 delta_base_obj = read_all(hash_size) 

1457 if crc32 is not None: 

1458 crc32 = binascii.crc32(delta_base_obj, crc32) 

1459 delta_base = delta_base_obj 

1460 raw_base += hash_size 

1461 else: 

1462 delta_base = None 

1463 

1464 unpacked = UnpackedObject( 

1465 type_num, 

1466 delta_base=delta_base, 

1467 decomp_len=size, 

1468 crc32=crc32, 

1469 hash_func=hash_func, 

1470 ) 

1471 unused = read_zlib_chunks( 

1472 read_some, 

1473 unpacked, 

1474 buffer_size=zlib_bufsize, 

1475 include_comp=include_comp, 

1476 ) 

1477 return unpacked, unused 

1478 

1479 

1480def _compute_object_size(value: tuple[int, Any]) -> int: 

1481 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1482 (num, obj) = value 

1483 if num in DELTA_TYPES: 

1484 return chunks_length(obj[1]) 

1485 return chunks_length(obj) 

1486 

1487 

1488class PackStreamReader: 

1489 """Class to read a pack stream. 

1490 

1491 The pack is read from a ReceivableProtocol using read() or recv() as 

1492 appropriate. 

1493 """ 

1494 

1495 def __init__( 

1496 self, 

1497 hash_func: Callable[[], "HashObject"], 

1498 read_all: Callable[[int], bytes], 

1499 read_some: Callable[[int], bytes] | None = None, 

1500 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1501 ) -> None: 

1502 """Initialize pack stream reader. 

1503 

1504 Args: 

1505 hash_func: Hash function to use for computing object IDs 

1506 read_all: Function to read all requested bytes 

1507 read_some: Function to read some bytes (optional) 

1508 zlib_bufsize: Buffer size for zlib decompression 

1509 """ 

1510 self.read_all = read_all 

1511 if read_some is None: 

1512 self.read_some = read_all 

1513 else: 

1514 self.read_some = read_some 

1515 self.hash_func = hash_func 

1516 self.sha = hash_func() 

1517 self._hash_size = len(hash_func().digest()) 

1518 self._offset = 0 

1519 self._rbuf = BytesIO() 

1520 # trailer is a deque to avoid memory allocation on small reads 

1521 self._trailer: deque[int] = deque() 

1522 self._zlib_bufsize = zlib_bufsize 

1523 

1524 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1525 """Read up to size bytes using the given callback. 

1526 

1527 As a side effect, update the verifier's hash (excluding the last 

1528 hash_size bytes read, which is the pack checksum). 

1529 

1530 Args: 

1531 read: The read callback to read from. 

1532 size: The maximum number of bytes to read; the particular 

1533 behavior is callback-specific. 

1534 Returns: Bytes read 

1535 """ 

1536 data = read(size) 

1537 

1538 # maintain a trailer of the last hash_size bytes we've read 

1539 n = len(data) 

1540 self._offset += n 

1541 tn = len(self._trailer) 

1542 if n >= self._hash_size: 

1543 to_pop = tn 

1544 to_add = self._hash_size 

1545 else: 

1546 to_pop = max(n + tn - self._hash_size, 0) 

1547 to_add = n 

1548 self.sha.update( 

1549 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1550 ) 

1551 self._trailer.extend(data[-to_add:]) 

1552 

1553 # hash everything but the trailer 

1554 self.sha.update(data[:-to_add]) 

1555 return data 

1556 

1557 def _buf_len(self) -> int: 

1558 buf = self._rbuf 

1559 start = buf.tell() 

1560 buf.seek(0, SEEK_END) 

1561 end = buf.tell() 

1562 buf.seek(start) 

1563 return end - start 

1564 

1565 @property 

1566 def offset(self) -> int: 

1567 """Return current offset in the stream.""" 

1568 return self._offset - self._buf_len() 

1569 

1570 def read(self, size: int) -> bytes: 

1571 """Read, blocking until size bytes are read.""" 

1572 buf_len = self._buf_len() 

1573 if buf_len >= size: 

1574 return self._rbuf.read(size) 

1575 buf_data = self._rbuf.read() 

1576 self._rbuf = BytesIO() 

1577 return buf_data + self._read(self.read_all, size - buf_len) 

1578 

1579 def recv(self, size: int) -> bytes: 

1580 """Read up to size bytes, blocking until one byte is read.""" 

1581 buf_len = self._buf_len() 

1582 if buf_len: 

1583 data = self._rbuf.read(size) 

1584 if size >= buf_len: 

1585 self._rbuf = BytesIO() 

1586 return data 

1587 return self._read(self.read_some, size) 

1588 

1589 def __len__(self) -> int: 

1590 """Return the number of objects in this pack.""" 

1591 return self._num_objects 

1592 

1593 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1594 """Read the objects in this pack file. 

1595 

1596 Args: 

1597 compute_crc32: If True, compute the CRC32 of the compressed 

1598 data. If False, the returned CRC32 will be None. 

1599 Returns: Iterator over UnpackedObjects with the following members set: 

1600 offset 

1601 obj_type_num 

1602 obj_chunks (for non-delta types) 

1603 delta_base (for delta types) 

1604 decomp_chunks 

1605 decomp_len 

1606 crc32 (if compute_crc32 is True) 

1607 

1608 Raises: 

1609 ChecksumMismatch: if the checksum of the pack contents does not 

1610 match the checksum in the pack trailer. 

1611 zlib.error: if an error occurred during zlib decompression. 

1612 IOError: if an error occurred writing to the output file. 

1613 """ 

1614 _pack_version, self._num_objects = read_pack_header(self.read) 

1615 

1616 for _ in range(self._num_objects): 

1617 offset = self.offset 

1618 unpacked, unused = unpack_object( 

1619 self.read, 

1620 self.hash_func, 

1621 read_some=self.recv, 

1622 compute_crc32=compute_crc32, 

1623 zlib_bufsize=self._zlib_bufsize, 

1624 ) 

1625 unpacked.offset = offset 

1626 

1627 # prepend any unused data to current read buffer 

1628 buf = BytesIO() 

1629 buf.write(unused) 

1630 buf.write(self._rbuf.read()) 

1631 buf.seek(0) 

1632 self._rbuf = buf 

1633 

1634 yield unpacked 

1635 

1636 if self._buf_len() < self._hash_size: 

1637 # If the read buffer is full, then the last read() got the whole 

1638 # trailer off the wire. If not, it means there is still some of the 

1639 # trailer to read. We need to read() all hash_size bytes; N come from the 

1640 # read buffer and (hash_size - N) come from the wire. 

1641 self.read(self._hash_size) 

1642 

1643 pack_sha = bytearray(self._trailer) 

1644 if pack_sha != self.sha.digest(): 

1645 raise ChecksumMismatch( 

1646 sha_to_hex(RawObjectID(bytes(pack_sha))), self.sha.hexdigest() 

1647 ) 

1648 

1649 

1650class PackStreamCopier(PackStreamReader): 

1651 """Class to verify a pack stream as it is being read. 

1652 

1653 The pack is read from a ReceivableProtocol using read() or recv() as 

1654 appropriate and written out to the given file-like object. 

1655 """ 

1656 

1657 def __init__( 

1658 self, 

1659 hash_func: Callable[[], "HashObject"], 

1660 read_all: Callable[[int], bytes], 

1661 read_some: Callable[[int], bytes] | None, 

1662 outfile: IO[bytes], 

1663 delta_iter: "DeltaChainIterator[UnpackedObject] | None" = None, 

1664 ) -> None: 

1665 """Initialize the copier. 

1666 

1667 Args: 

1668 hash_func: Hash function to use for computing object IDs 

1669 read_all: Read function that blocks until the number of 

1670 requested bytes are read. 

1671 read_some: Read function that returns at least one byte, but may 

1672 not return the number of bytes requested. 

1673 outfile: File-like object to write output through. 

1674 delta_iter: Optional DeltaChainIterator to record deltas as we 

1675 read them. 

1676 """ 

1677 super().__init__(hash_func, read_all, read_some=read_some) 

1678 self.outfile = outfile 

1679 self._delta_iter = delta_iter 

1680 

1681 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1682 """Read data from the read callback and write it to the file.""" 

1683 data = super()._read(read, size) 

1684 self.outfile.write(data) 

1685 return data 

1686 

1687 def verify(self, progress: Callable[..., None] | None = None) -> None: 

1688 """Verify a pack stream and write it to the output file. 

1689 

1690 See PackStreamReader.iterobjects for a list of exceptions this may 

1691 throw. 

1692 """ 

1693 i = 0 # default count of entries if read_objects() is empty 

1694 for i, unpacked in enumerate(self.read_objects()): 

1695 if self._delta_iter: 

1696 self._delta_iter.record(unpacked) 

1697 if progress is not None: 

1698 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1699 if progress is not None: 

1700 progress(f"copied {i} pack entries\n".encode("ascii")) 

1701 

1702 

1703def obj_sha( 

1704 type: int, 

1705 chunks: bytes | Iterable[bytes], 

1706 hash_func: Callable[[], "HashObject"] = sha1, 

1707) -> bytes: 

1708 """Compute the SHA for a numeric type and object chunks. 

1709 

1710 Args: 

1711 type: Object type number 

1712 chunks: Object data chunks 

1713 hash_func: Hash function to use (defaults to sha1) 

1714 

1715 Returns: 

1716 Binary hash digest 

1717 """ 

1718 sha = hash_func() 

1719 sha.update(object_header(type, chunks_length(chunks))) 

1720 if isinstance(chunks, bytes): 

1721 sha.update(chunks) 

1722 else: 

1723 for chunk in chunks: 

1724 sha.update(chunk) 

1725 return sha.digest() 

1726 

1727 

1728def compute_file_sha( 

1729 f: IO[bytes], 

1730 hash_func: Callable[[], "HashObject"], 

1731 start_ofs: int = 0, 

1732 end_ofs: int = 0, 

1733 buffer_size: int = 1 << 16, 

1734) -> "HashObject": 

1735 """Hash a portion of a file into a new SHA. 

1736 

1737 Args: 

1738 f: A file-like object to read from that supports seek(). 

1739 hash_func: A callable that returns a new HashObject. 

1740 start_ofs: The offset in the file to start reading at. 

1741 end_ofs: The offset in the file to end reading at, relative to the 

1742 end of the file. 

1743 buffer_size: A buffer size for reading. 

1744 Returns: A new SHA object updated with data read from the file. 

1745 """ 

1746 sha = hash_func() 

1747 f.seek(0, SEEK_END) 

1748 length = f.tell() 

1749 if start_ofs < 0: 

1750 raise AssertionError(f"start_ofs cannot be negative: {start_ofs}") 

1751 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1752 raise AssertionError( 

1753 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1754 ) 

1755 todo = length + end_ofs - start_ofs 

1756 f.seek(start_ofs) 

1757 while todo: 

1758 data = f.read(min(todo, buffer_size)) 

1759 sha.update(data) 

1760 todo -= len(data) 

1761 return sha 

1762 

1763 

1764class PackData: 

1765 """The data contained in a packfile. 

1766 

1767 Pack files can be accessed both sequentially for exploding a pack, and 

1768 directly with the help of an index to retrieve a specific object. 

1769 

1770 The objects within are either complete or a delta against another. 

1771 

1772 The header is variable length. If the MSB of each byte is set then it 

1773 indicates that the subsequent byte is still part of the header. 

1774 For the first byte the next MS bits are the type, which tells you the type 

1775 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1776 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1777 size, i.e. the last byte of the header contains the MS bits of the size. 

1778 

1779 For the complete objects the data is stored as zlib deflated data. 

1780 The size in the header is the uncompressed object size, so to uncompress 

1781 you need to just keep feeding data to zlib until you get an object back, 

1782 or it errors on bad data. This is done here by just giving the complete 

1783 buffer from the start of the deflated object on. This is bad, but until I 

1784 get mmap sorted out it will have to do. 

1785 

1786 Currently there are no integrity checks done. Also no attempt is made to 

1787 try and detect the delta case, or a request for an object at the wrong 

1788 position. It will all just throw a zlib or KeyError. 

1789 """ 

1790 

1791 def __init__( 

1792 self, 

1793 filename: str | os.PathLike[str], 

1794 object_format: ObjectFormat, 

1795 file: IO[bytes] | None = None, 

1796 size: int | None = None, 

1797 *, 

1798 delta_window_size: int | None = None, 

1799 window_memory: int | None = None, 

1800 delta_cache_size: int | None = None, 

1801 depth: int | None = None, 

1802 threads: int | None = None, 

1803 big_file_threshold: int | None = None, 

1804 delta_base_cache_limit: int | None = None, 

1805 ) -> None: 

1806 """Create a PackData object representing the pack in the given filename. 

1807 

1808 The file must exist and stay readable until the object is disposed of. 

1809 It must also stay the same size. It will be mapped whenever needed. 

1810 

1811 Currently there is a restriction on the size of the pack as the python 

1812 mmap implementation is flawed. 

1813 """ 

1814 self._filename = filename 

1815 self.object_format = object_format 

1816 self._size = size 

1817 self._header_size = 12 

1818 self.delta_window_size = delta_window_size 

1819 self.window_memory = window_memory 

1820 self.delta_cache_size = delta_cache_size 

1821 self.depth = depth 

1822 self.threads = threads 

1823 self.big_file_threshold = big_file_threshold 

1824 self.delta_base_cache_limit = delta_base_cache_limit 

1825 self._file: IO[bytes] 

1826 

1827 if file is None: 

1828 self._file = GitFile(self._filename, "rb") 

1829 else: 

1830 self._file = file 

1831 (_version, self._num_objects) = read_pack_header(self._file.read) 

1832 

1833 # Use delta_base_cache_limit, then delta_cache_size, then default 

1834 cache_size = ( 

1835 delta_base_cache_limit or delta_cache_size or DEFAULT_DELTA_BASE_CACHE_LIMIT 

1836 ) 

1837 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1838 cache_size, compute_size=_compute_object_size 

1839 ) 

1840 

1841 @property 

1842 def filename(self) -> str: 

1843 """Get the filename of the pack file. 

1844 

1845 Returns: 

1846 Base filename without directory path 

1847 """ 

1848 return os.path.basename(self._filename) 

1849 

1850 @property 

1851 def path(self) -> str | os.PathLike[str]: 

1852 """Get the full path of the pack file. 

1853 

1854 Returns: 

1855 Full path to the pack file 

1856 """ 

1857 return self._filename 

1858 

1859 @classmethod 

1860 def from_file( 

1861 cls, 

1862 file: IO[bytes], 

1863 object_format: ObjectFormat, 

1864 size: int | None = None, 

1865 ) -> "PackData": 

1866 """Create a PackData object from an open file. 

1867 

1868 Args: 

1869 file: Open file object 

1870 object_format: Object format 

1871 size: Optional file size 

1872 

1873 Returns: 

1874 PackData instance 

1875 """ 

1876 return cls(str(file), object_format, file=file, size=size) 

1877 

1878 @classmethod 

1879 def from_path( 

1880 cls, 

1881 path: str | os.PathLike[str], 

1882 object_format: ObjectFormat, 

1883 ) -> "PackData": 

1884 """Create a PackData object from a file path. 

1885 

1886 Args: 

1887 path: Path to the pack file 

1888 object_format: Object format 

1889 

1890 Returns: 

1891 PackData instance 

1892 """ 

1893 return cls(filename=path, object_format=object_format) 

1894 

1895 def close(self) -> None: 

1896 """Close the underlying pack file.""" 

1897 if self._file is not None: 

1898 self._file.close() 

1899 self._file = None # type: ignore 

1900 

1901 def __del__(self) -> None: 

1902 """Ensure pack file is closed when PackData is garbage collected.""" 

1903 if getattr(self, "_file", None) is not None: 

1904 import warnings 

1905 

1906 warnings.warn( 

1907 f"unclosed PackData {self!r}", 

1908 ResourceWarning, 

1909 stacklevel=2, 

1910 source=self, 

1911 ) 

1912 try: 

1913 self.close() 

1914 except Exception: 

1915 # Ignore errors during cleanup 

1916 pass 

1917 

1918 def __enter__(self) -> Self: 

1919 """Enter context manager.""" 

1920 return self 

1921 

1922 def __exit__( 

1923 self, 

1924 type: type | None, 

1925 value: BaseException | None, 

1926 traceback: TracebackType | None, 

1927 ) -> None: 

1928 """Exit context manager.""" 

1929 self.close() 

1930 

1931 def __eq__(self, other: object) -> bool: 

1932 """Check equality with another object.""" 

1933 if isinstance(other, PackData): 

1934 return self.get_stored_checksum() == other.get_stored_checksum() 

1935 return False 

1936 

1937 def _get_size(self) -> int: 

1938 if self._size is not None: 

1939 return self._size 

1940 self._size = os.path.getsize(self._filename) 

1941 if self._size < self._header_size: 

1942 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1943 raise AssertionError(errmsg) 

1944 return self._size 

1945 

1946 def __len__(self) -> int: 

1947 """Returns the number of objects in this pack.""" 

1948 return self._num_objects 

1949 

1950 def calculate_checksum(self) -> bytes: 

1951 """Calculate the checksum for this pack. 

1952 

1953 Returns: Binary digest (size depends on hash algorithm) 

1954 """ 

1955 return compute_file_sha( 

1956 self._file, 

1957 hash_func=self.object_format.hash_func, 

1958 end_ofs=-self.object_format.oid_length, 

1959 ).digest() 

1960 

1961 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1962 """Iterate over unpacked objects in the pack.""" 

1963 self._file.seek(self._header_size) 

1964 

1965 if self._num_objects is None: 

1966 return 

1967 

1968 for _ in range(self._num_objects): 

1969 offset = self._file.tell() 

1970 unpacked, unused = unpack_object( 

1971 self._file.read, 

1972 self.object_format.hash_func, 

1973 compute_crc32=False, 

1974 include_comp=include_comp, 

1975 ) 

1976 unpacked.offset = offset 

1977 yield unpacked 

1978 # Back up over unused data. 

1979 self._file.seek(-len(unused), SEEK_CUR) 

1980 

1981 def iterentries( 

1982 self, 

1983 progress: Callable[[int, int], None] | None = None, 

1984 resolve_ext_ref: ResolveExtRefFn | None = None, 

1985 ) -> Iterator[PackIndexEntry]: 

1986 """Yield entries summarizing the contents of this pack. 

1987 

1988 Args: 

1989 progress: Progress function, called with current and total 

1990 object count. 

1991 resolve_ext_ref: Optional function to resolve external references 

1992 Returns: iterator of tuples with (sha, offset, crc32) 

1993 """ 

1994 num_objects = self._num_objects 

1995 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1996 for i, result in enumerate(indexer): 

1997 if progress is not None: 

1998 progress(i, num_objects) 

1999 yield result 

2000 

2001 def sorted_entries( 

2002 self, 

2003 progress: Callable[[int, int], None] | None = None, 

2004 resolve_ext_ref: ResolveExtRefFn | None = None, 

2005 ) -> list[tuple[RawObjectID, int, int]]: 

2006 """Return entries in this pack, sorted by SHA. 

2007 

2008 Args: 

2009 progress: Progress function, called with current and total 

2010 object count 

2011 resolve_ext_ref: Optional function to resolve external references 

2012 Returns: Iterator of tuples with (sha, offset, crc32) 

2013 """ 

2014 return sorted( 

2015 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore 

2016 ) 

2017 

2018 def create_index_v1( 

2019 self, 

2020 filename: str, 

2021 progress: Callable[..., None] | None = None, 

2022 resolve_ext_ref: ResolveExtRefFn | None = None, 

2023 ) -> bytes: 

2024 """Create a version 1 file for this data file. 

2025 

2026 Args: 

2027 filename: Index filename. 

2028 progress: Progress report function 

2029 resolve_ext_ref: Optional function to resolve external references 

2030 Returns: Checksum of index file 

2031 """ 

2032 entries = self.sorted_entries( 

2033 progress=progress, resolve_ext_ref=resolve_ext_ref 

2034 ) 

2035 checksum = self.calculate_checksum() 

2036 with GitFile(filename, "wb") as f: 

2037 write_pack_index_v1( 

2038 f, 

2039 entries, 

2040 checksum, 

2041 ) 

2042 return checksum 

2043 

2044 def create_index_v2( 

2045 self, 

2046 filename: str, 

2047 progress: Callable[..., None] | None = None, 

2048 resolve_ext_ref: ResolveExtRefFn | None = None, 

2049 ) -> bytes: 

2050 """Create a version 2 index file for this data file. 

2051 

2052 Args: 

2053 filename: Index filename. 

2054 progress: Progress report function 

2055 resolve_ext_ref: Optional function to resolve external references 

2056 Returns: Checksum of index file 

2057 """ 

2058 entries = self.sorted_entries( 

2059 progress=progress, resolve_ext_ref=resolve_ext_ref 

2060 ) 

2061 with GitFile(filename, "wb") as f: 

2062 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

2063 

2064 def create_index_v3( 

2065 self, 

2066 filename: str, 

2067 progress: Callable[..., None] | None = None, 

2068 resolve_ext_ref: ResolveExtRefFn | None = None, 

2069 hash_format: int | None = None, 

2070 ) -> bytes: 

2071 """Create a version 3 index file for this data file. 

2072 

2073 Args: 

2074 filename: Index filename. 

2075 progress: Progress report function 

2076 resolve_ext_ref: Function to resolve external references 

2077 hash_format: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

2078 Returns: Checksum of index file 

2079 """ 

2080 entries = self.sorted_entries( 

2081 progress=progress, resolve_ext_ref=resolve_ext_ref 

2082 ) 

2083 with GitFile(filename, "wb") as f: 

2084 if hash_format is None: 

2085 hash_format = 1 # Default to SHA-1 

2086 return write_pack_index_v3( 

2087 f, entries, self.calculate_checksum(), hash_format=hash_format 

2088 ) 

2089 

2090 def create_index( 

2091 self, 

2092 filename: str, 

2093 progress: Callable[..., None] | None = None, 

2094 version: int = 2, 

2095 resolve_ext_ref: ResolveExtRefFn | None = None, 

2096 hash_format: int | None = None, 

2097 ) -> bytes: 

2098 """Create an index file for this data file. 

2099 

2100 Args: 

2101 filename: Index filename. 

2102 progress: Progress report function 

2103 version: Index version (1, 2, or 3) 

2104 resolve_ext_ref: Function to resolve external references 

2105 hash_format: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

2106 Returns: Checksum of index file 

2107 """ 

2108 if version == 1: 

2109 return self.create_index_v1( 

2110 filename, progress, resolve_ext_ref=resolve_ext_ref 

2111 ) 

2112 elif version == 2: 

2113 return self.create_index_v2( 

2114 filename, progress, resolve_ext_ref=resolve_ext_ref 

2115 ) 

2116 elif version == 3: 

2117 return self.create_index_v3( 

2118 filename, 

2119 progress, 

2120 resolve_ext_ref=resolve_ext_ref, 

2121 hash_format=hash_format, 

2122 ) 

2123 else: 

2124 raise ValueError(f"unknown index format {version}") 

2125 

2126 def get_stored_checksum(self) -> bytes: 

2127 """Return the expected checksum stored in this pack.""" 

2128 checksum_size = self.object_format.oid_length 

2129 self._file.seek(-checksum_size, SEEK_END) 

2130 return self._file.read(checksum_size) 

2131 

2132 def check(self) -> None: 

2133 """Check the consistency of this pack.""" 

2134 actual = self.calculate_checksum() 

2135 stored = self.get_stored_checksum() 

2136 if actual != stored: 

2137 raise ChecksumMismatch(stored, actual) 

2138 

2139 def get_unpacked_object_at( 

2140 self, offset: int, *, include_comp: bool = False 

2141 ) -> UnpackedObject: 

2142 """Given offset in the packfile return a UnpackedObject.""" 

2143 assert offset >= self._header_size 

2144 self._file.seek(offset) 

2145 unpacked, _ = unpack_object( 

2146 self._file.read, self.object_format.hash_func, include_comp=include_comp 

2147 ) 

2148 unpacked.offset = offset 

2149 return unpacked 

2150 

2151 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

2152 """Given an offset in to the packfile return the object that is there. 

2153 

2154 Using the associated index the location of an object can be looked up, 

2155 and then the packfile can be asked directly for that object using this 

2156 function. 

2157 """ 

2158 try: 

2159 return self._offset_cache[offset] 

2160 except KeyError: 

2161 pass 

2162 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

2163 return (unpacked.pack_type_num, unpacked._obj()) 

2164 

2165 

2166T = TypeVar("T") 

2167 

2168 

2169class DeltaChainIterator(Generic[T]): 

2170 """Abstract iterator over pack data based on delta chains. 

2171 

2172 Each object in the pack is guaranteed to be inflated exactly once, 

2173 regardless of how many objects reference it as a delta base. As a result, 

2174 memory usage is proportional to the length of the longest delta chain. 

2175 

2176 Subclasses can override _result to define the result type of the iterator. 

2177 By default, results are UnpackedObjects with the following members set: 

2178 

2179 * offset 

2180 * obj_type_num 

2181 * obj_chunks 

2182 * pack_type_num 

2183 * delta_base (for delta types) 

2184 * comp_chunks (if _include_comp is True) 

2185 * decomp_chunks 

2186 * decomp_len 

2187 * crc32 (if _compute_crc32 is True) 

2188 """ 

2189 

2190 _compute_crc32 = False 

2191 _include_comp = False 

2192 

2193 def __init__( 

2194 self, 

2195 file_obj: IO[bytes] | None, 

2196 hash_func: Callable[[], "HashObject"], 

2197 *, 

2198 resolve_ext_ref: ResolveExtRefFn | None = None, 

2199 object_format: "ObjectFormat | None" = None, 

2200 ) -> None: 

2201 """Initialize DeltaChainIterator. 

2202 

2203 Args: 

2204 file_obj: File object to read pack data from 

2205 hash_func: Hash function to use for computing object IDs 

2206 resolve_ext_ref: Optional function to resolve external references 

2207 object_format: Optional object format. Required by subclasses 

2208 that materialise objects (e.g. PackInflater) when iterating 

2209 packs in a non-default hash algorithm such as SHA-256. 

2210 """ 

2211 self._file = file_obj 

2212 self.hash_func = hash_func 

2213 self._object_format = object_format 

2214 self._resolve_ext_ref = resolve_ext_ref 

2215 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

2216 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

2217 self._full_ofs: list[tuple[int, int]] = [] 

2218 self._ext_refs: list[RawObjectID] = [] 

2219 

2220 @classmethod 

2221 def for_pack_data( 

2222 cls, pack_data: PackData, resolve_ext_ref: ResolveExtRefFn | None = None 

2223 ) -> "DeltaChainIterator[T]": 

2224 """Create a DeltaChainIterator from pack data. 

2225 

2226 Args: 

2227 pack_data: PackData object to iterate 

2228 resolve_ext_ref: Optional function to resolve external refs 

2229 

2230 Returns: 

2231 DeltaChainIterator instance 

2232 """ 

2233 walker = cls( 

2234 None, 

2235 pack_data.object_format.hash_func, 

2236 resolve_ext_ref=resolve_ext_ref, 

2237 object_format=pack_data.object_format, 

2238 ) 

2239 walker.set_pack_data(pack_data) 

2240 for unpacked in pack_data.iter_unpacked(include_comp=False): 

2241 walker.record(unpacked) 

2242 return walker 

2243 

2244 @classmethod 

2245 def for_pack_subset( 

2246 cls, 

2247 pack: "Pack", 

2248 shas: Iterable[ObjectID | RawObjectID], 

2249 *, 

2250 allow_missing: bool = False, 

2251 resolve_ext_ref: ResolveExtRefFn | None = None, 

2252 ) -> "DeltaChainIterator[T]": 

2253 """Create a DeltaChainIterator for a subset of objects. 

2254 

2255 Args: 

2256 pack: Pack object containing the data 

2257 shas: Iterable of object SHAs to include 

2258 allow_missing: If True, skip missing objects 

2259 resolve_ext_ref: Optional function to resolve external refs 

2260 

2261 Returns: 

2262 DeltaChainIterator instance 

2263 """ 

2264 walker = cls( 

2265 None, 

2266 pack.object_format.hash_func, 

2267 resolve_ext_ref=resolve_ext_ref, 

2268 object_format=pack.object_format, 

2269 ) 

2270 walker.set_pack_data(pack.data) 

2271 todo = set() 

2272 for sha in shas: 

2273 try: 

2274 off = pack.index.object_offset(sha) 

2275 except KeyError: 

2276 if not allow_missing: 

2277 raise 

2278 else: 

2279 todo.add(off) 

2280 done = set() 

2281 while todo: 

2282 off = todo.pop() 

2283 unpacked = pack.data.get_unpacked_object_at(off) 

2284 walker.record(unpacked) 

2285 done.add(off) 

2286 base_ofs = None 

2287 if unpacked.pack_type_num == OFS_DELTA: 

2288 assert unpacked.offset is not None 

2289 assert unpacked.delta_base is not None 

2290 assert isinstance(unpacked.delta_base, int) 

2291 base_ofs = unpacked.offset - unpacked.delta_base 

2292 elif unpacked.pack_type_num == REF_DELTA: 

2293 with suppress(KeyError): 

2294 assert isinstance(unpacked.delta_base, bytes) 

2295 base_ofs = pack.index.object_offset( 

2296 RawObjectID(unpacked.delta_base) 

2297 ) 

2298 if base_ofs is not None and base_ofs not in done: 

2299 todo.add(base_ofs) 

2300 return walker 

2301 

2302 def record(self, unpacked: UnpackedObject) -> None: 

2303 """Record an unpacked object for later processing. 

2304 

2305 Args: 

2306 unpacked: UnpackedObject to record 

2307 """ 

2308 type_num = unpacked.pack_type_num 

2309 offset = unpacked.offset 

2310 assert offset is not None 

2311 if type_num == OFS_DELTA: 

2312 assert unpacked.delta_base is not None 

2313 assert isinstance(unpacked.delta_base, int) 

2314 base_offset = offset - unpacked.delta_base 

2315 self._pending_ofs[base_offset].append(offset) 

2316 elif type_num == REF_DELTA: 

2317 assert isinstance(unpacked.delta_base, bytes) 

2318 self._pending_ref[unpacked.delta_base].append(offset) 

2319 else: 

2320 self._full_ofs.append((offset, type_num)) 

2321 

2322 def set_pack_data(self, pack_data: PackData) -> None: 

2323 """Set the pack data for iteration. 

2324 

2325 Args: 

2326 pack_data: PackData object to use 

2327 """ 

2328 self._file = pack_data._file 

2329 

2330 def _walk_all_chains(self) -> Iterator[T]: 

2331 for offset, type_num in self._full_ofs: 

2332 yield from self._follow_chain(offset, type_num, None) 

2333 yield from self._walk_ref_chains() 

2334 assert not self._pending_ofs, repr(self._pending_ofs) 

2335 

2336 def _ensure_no_pending(self) -> None: 

2337 if self._pending_ref: 

2338 raise UnresolvedDeltas( 

2339 [sha_to_hex(RawObjectID(s)) for s in self._pending_ref] 

2340 ) 

2341 

2342 def _walk_ref_chains(self) -> Iterator[T]: 

2343 if not self._resolve_ext_ref: 

2344 self._ensure_no_pending() 

2345 return 

2346 

2347 for base_sha, pending in sorted(self._pending_ref.items()): 

2348 if base_sha not in self._pending_ref: 

2349 continue 

2350 try: 

2351 type_num, chunks = self._resolve_ext_ref(RawObjectID(base_sha)) 

2352 except KeyError: 

2353 # Not an external ref, but may depend on one. Either it will 

2354 # get popped via a _follow_chain call, or we will raise an 

2355 # error below. 

2356 continue 

2357 self._ext_refs.append(RawObjectID(base_sha)) 

2358 self._pending_ref.pop(base_sha) 

2359 for new_offset in pending: 

2360 yield from self._follow_chain(new_offset, type_num, chunks) 

2361 

2362 self._ensure_no_pending() 

2363 

2364 def _result(self, unpacked: UnpackedObject) -> T: 

2365 raise NotImplementedError 

2366 

2367 def _resolve_object( 

2368 self, 

2369 offset: int, 

2370 obj_type_num: int, 

2371 base_chunks: bytes | list[bytes] | None, 

2372 ) -> UnpackedObject: 

2373 assert self._file is not None 

2374 self._file.seek(offset) 

2375 unpacked, _ = unpack_object( 

2376 self._file.read, 

2377 self.hash_func, 

2378 read_some=None, 

2379 compute_crc32=self._compute_crc32, 

2380 include_comp=self._include_comp, 

2381 ) 

2382 unpacked.offset = offset 

2383 if base_chunks is None: 

2384 assert unpacked.pack_type_num == obj_type_num 

2385 else: 

2386 assert unpacked.pack_type_num in DELTA_TYPES 

2387 unpacked.obj_type_num = obj_type_num 

2388 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2389 # A delta that resolves to a zero-byte payload for a 

2390 # commit/tree/tag is malformed: ``_parse_message`` / 

2391 # ``parse_tree`` accept the empty input silently, so without 

2392 # this guard a too-short delta could materialise an 

2393 # otherwise-valid SHA pointing at an empty commit object 

2394 # (which ``git fsck`` rejects). Only blobs may legitimately 

2395 # be empty, and an empty blob would never be stored as a 

2396 # delta in practice. 

2397 # Blob.type_num == 3 (avoid the import cycle). 

2398 if obj_type_num != 3 and chunks_length(unpacked.obj_chunks) == 0: 

2399 raise ApplyDeltaError( 

2400 f"delta resolved to empty payload for type {obj_type_num}" 

2401 ) 

2402 return unpacked 

2403 

2404 def _follow_chain( 

2405 self, 

2406 offset: int, 

2407 obj_type_num: int, 

2408 base_chunks: bytes | list[bytes] | None, 

2409 ) -> Iterator[T]: 

2410 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2411 # this approach by design inflates each object exactly once. 

2412 todo = [(offset, obj_type_num, base_chunks)] 

2413 while todo: 

2414 (offset, obj_type_num, base_chunks) = todo.pop() 

2415 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2416 yield self._result(unpacked) 

2417 

2418 assert unpacked.offset is not None 

2419 unblocked = chain( 

2420 self._pending_ofs.pop(unpacked.offset, []), 

2421 self._pending_ref.pop(unpacked.sha(), []), 

2422 ) 

2423 todo.extend( 

2424 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2425 for new_offset in unblocked 

2426 ) 

2427 

2428 def __iter__(self) -> Iterator[T]: 

2429 """Iterate over objects in the pack.""" 

2430 return self._walk_all_chains() 

2431 

2432 def ext_refs(self) -> list[RawObjectID]: 

2433 """Return external references.""" 

2434 return self._ext_refs 

2435 

2436 

2437class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2438 """Delta chain iterator that yield unpacked objects.""" 

2439 

2440 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2441 """Return the unpacked object. 

2442 

2443 Args: 

2444 unpacked: The unpacked object 

2445 

2446 Returns: 

2447 The unpacked object unchanged 

2448 """ 

2449 return unpacked 

2450 

2451 

2452class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2453 """Delta chain iterator that yields index entries.""" 

2454 

2455 _compute_crc32 = True 

2456 

2457 def _result(self, unpacked: UnpackedObject) -> PackIndexEntry: 

2458 """Convert unpacked object to pack index entry. 

2459 

2460 Args: 

2461 unpacked: The unpacked object 

2462 

2463 Returns: 

2464 Tuple of (sha, offset, crc32) for index entry 

2465 """ 

2466 assert unpacked.offset is not None 

2467 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2468 

2469 

2470class PackInflater(DeltaChainIterator[ShaFile]): 

2471 """Delta chain iterator that yields ShaFile objects.""" 

2472 

2473 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2474 """Convert unpacked object to ShaFile. 

2475 

2476 Args: 

2477 unpacked: The unpacked object 

2478 

2479 Returns: 

2480 ShaFile object from the unpacked data 

2481 """ 

2482 assert unpacked.obj_type_num is not None and unpacked.obj_chunks is not None 

2483 return ShaFile.from_raw_chunks( 

2484 unpacked.obj_type_num, 

2485 unpacked.obj_chunks, 

2486 object_format=self._object_format, 

2487 ) 

2488 

2489 

2490class SHA1Reader(BinaryIO): 

2491 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2492 

2493 def __init__(self, f: IO[bytes]) -> None: 

2494 """Initialize SHA1Reader. 

2495 

2496 Args: 

2497 f: File-like object to wrap 

2498 """ 

2499 self.f = f 

2500 self.sha1 = sha1(b"") 

2501 

2502 def read(self, size: int = -1) -> bytes: 

2503 """Read bytes and update SHA1. 

2504 

2505 Args: 

2506 size: Number of bytes to read, -1 for all 

2507 

2508 Returns: 

2509 Bytes read from file 

2510 """ 

2511 data = self.f.read(size) 

2512 self.sha1.update(data) 

2513 return data 

2514 

2515 def check_sha(self, allow_empty: bool = False) -> None: 

2516 """Check if the SHA1 matches the expected value. 

2517 

2518 Args: 

2519 allow_empty: Allow empty SHA1 hash 

2520 

2521 Raises: 

2522 ChecksumMismatch: If SHA1 doesn't match 

2523 """ 

2524 stored = self.f.read(20) 

2525 # If git option index.skipHash is set the index will be empty 

2526 if stored != self.sha1.digest() and ( 

2527 not allow_empty 

2528 or ( 

2529 len(stored) == 20 

2530 and sha_to_hex(RawObjectID(stored)) 

2531 != b"0000000000000000000000000000000000000000" 

2532 ) 

2533 ): 

2534 raise ChecksumMismatch( 

2535 self.sha1.hexdigest(), 

2536 sha_to_hex(RawObjectID(stored)) if stored else b"", 

2537 ) 

2538 

2539 def close(self) -> None: 

2540 """Close the underlying file.""" 

2541 return self.f.close() 

2542 

2543 def tell(self) -> int: 

2544 """Return current file position.""" 

2545 return self.f.tell() 

2546 

2547 # BinaryIO abstract methods 

2548 def readable(self) -> bool: 

2549 """Check if file is readable.""" 

2550 return True 

2551 

2552 def writable(self) -> bool: 

2553 """Check if file is writable.""" 

2554 return False 

2555 

2556 def seekable(self) -> bool: 

2557 """Check if file is seekable.""" 

2558 return getattr(self.f, "seekable", lambda: False)() 

2559 

2560 def seek(self, offset: int, whence: int = 0) -> int: 

2561 """Seek to position in file. 

2562 

2563 Args: 

2564 offset: Position offset 

2565 whence: Reference point (0=start, 1=current, 2=end) 

2566 

2567 Returns: 

2568 New file position 

2569 """ 

2570 return self.f.seek(offset, whence) 

2571 

2572 def flush(self) -> None: 

2573 """Flush the file buffer.""" 

2574 if hasattr(self.f, "flush"): 

2575 self.f.flush() 

2576 

2577 def readline(self, size: int = -1) -> bytes: 

2578 """Read a line from the file. 

2579 

2580 Args: 

2581 size: Maximum bytes to read 

2582 

2583 Returns: 

2584 Line read from file 

2585 """ 

2586 return self.f.readline(size) 

2587 

2588 def readlines(self, hint: int = -1) -> list[bytes]: 

2589 """Read all lines from the file. 

2590 

2591 Args: 

2592 hint: Approximate number of bytes to read 

2593 

2594 Returns: 

2595 List of lines 

2596 """ 

2597 return self.f.readlines(hint) 

2598 

2599 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2600 """Write multiple lines to the file (not supported).""" 

2601 raise UnsupportedOperation("writelines") 

2602 

2603 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2604 """Write data to the file (not supported).""" 

2605 raise UnsupportedOperation("write") 

2606 

2607 def __enter__(self) -> Self: 

2608 """Enter context manager.""" 

2609 return self 

2610 

2611 def __exit__( 

2612 self, 

2613 type: type | None, 

2614 value: BaseException | None, 

2615 traceback: TracebackType | None, 

2616 ) -> None: 

2617 """Exit context manager and close file.""" 

2618 self.close() 

2619 

2620 def __iter__(self) -> "SHA1Reader": 

2621 """Return iterator for reading file lines.""" 

2622 return self 

2623 

2624 def __next__(self) -> bytes: 

2625 """Get next line from file. 

2626 

2627 Returns: 

2628 Next line 

2629 

2630 Raises: 

2631 StopIteration: When no more lines 

2632 """ 

2633 line = self.readline() 

2634 if not line: 

2635 raise StopIteration 

2636 return line 

2637 

2638 def fileno(self) -> int: 

2639 """Return file descriptor number.""" 

2640 return self.f.fileno() 

2641 

2642 def isatty(self) -> bool: 

2643 """Check if file is a terminal.""" 

2644 return getattr(self.f, "isatty", lambda: False)() 

2645 

2646 def truncate(self, size: int | None = None) -> int: 

2647 """Not supported for read-only file. 

2648 

2649 Raises: 

2650 UnsupportedOperation: Always raised 

2651 """ 

2652 raise UnsupportedOperation("truncate") 

2653 

2654 

2655class SHA1Writer(BinaryIO): 

2656 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2657 

2658 def __init__(self, f: BinaryIO | IO[bytes]) -> None: 

2659 """Initialize SHA1Writer. 

2660 

2661 Args: 

2662 f: File-like object to wrap 

2663 """ 

2664 self.f = f 

2665 self.length = 0 

2666 self.sha1 = sha1(b"") 

2667 self.digest: bytes | None = None 

2668 

2669 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2670 """Write data and update SHA1. 

2671 

2672 Args: 

2673 data: Data to write 

2674 

2675 Returns: 

2676 Number of bytes written 

2677 """ 

2678 self.sha1.update(data) 

2679 written = self.f.write(data) 

2680 self.length += written 

2681 return written 

2682 

2683 def write_sha(self) -> bytes: 

2684 """Write the SHA1 digest to the file. 

2685 

2686 Returns: 

2687 The SHA1 digest bytes 

2688 """ 

2689 sha = self.sha1.digest() 

2690 assert len(sha) == 20 

2691 self.f.write(sha) 

2692 self.length += len(sha) 

2693 return sha 

2694 

2695 def close(self) -> None: 

2696 """Close the pack file and finalize the SHA.""" 

2697 self.digest = self.write_sha() 

2698 self.f.close() 

2699 

2700 def offset(self) -> int: 

2701 """Get the total number of bytes written. 

2702 

2703 Returns: 

2704 Total bytes written 

2705 """ 

2706 return self.length 

2707 

2708 def tell(self) -> int: 

2709 """Return current file position.""" 

2710 return self.f.tell() 

2711 

2712 # BinaryIO abstract methods 

2713 def readable(self) -> bool: 

2714 """Check if file is readable.""" 

2715 return False 

2716 

2717 def writable(self) -> bool: 

2718 """Check if file is writable.""" 

2719 return True 

2720 

2721 def seekable(self) -> bool: 

2722 """Check if file is seekable.""" 

2723 return getattr(self.f, "seekable", lambda: False)() 

2724 

2725 def seek(self, offset: int, whence: int = 0) -> int: 

2726 """Seek to position in file. 

2727 

2728 Args: 

2729 offset: Position offset 

2730 whence: Reference point (0=start, 1=current, 2=end) 

2731 

2732 Returns: 

2733 New file position 

2734 """ 

2735 return self.f.seek(offset, whence) 

2736 

2737 def flush(self) -> None: 

2738 """Flush the file buffer.""" 

2739 if hasattr(self.f, "flush"): 

2740 self.f.flush() 

2741 

2742 def readline(self, size: int = -1) -> bytes: 

2743 """Not supported for write-only file. 

2744 

2745 Raises: 

2746 UnsupportedOperation: Always raised 

2747 """ 

2748 raise UnsupportedOperation("readline") 

2749 

2750 def readlines(self, hint: int = -1) -> list[bytes]: 

2751 """Not supported for write-only file. 

2752 

2753 Raises: 

2754 UnsupportedOperation: Always raised 

2755 """ 

2756 raise UnsupportedOperation("readlines") 

2757 

2758 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2759 """Write multiple lines to the file. 

2760 

2761 Args: 

2762 lines: Iterable of lines to write 

2763 """ 

2764 for line in lines: 

2765 self.write(line) 

2766 

2767 def read(self, size: int = -1) -> bytes: 

2768 """Not supported for write-only file. 

2769 

2770 Raises: 

2771 UnsupportedOperation: Always raised 

2772 """ 

2773 raise UnsupportedOperation("read") 

2774 

2775 def __enter__(self) -> Self: 

2776 """Enter context manager.""" 

2777 return self 

2778 

2779 def __exit__( 

2780 self, 

2781 type: type | None, 

2782 value: BaseException | None, 

2783 traceback: TracebackType | None, 

2784 ) -> None: 

2785 """Exit context manager and close file.""" 

2786 self.f.close() 

2787 

2788 def __iter__(self) -> "SHA1Writer": 

2789 """Return iterator.""" 

2790 return self 

2791 

2792 def __next__(self) -> bytes: 

2793 """Not supported for write-only file. 

2794 

2795 Raises: 

2796 UnsupportedOperation: Always raised 

2797 """ 

2798 raise UnsupportedOperation("__next__") 

2799 

2800 def fileno(self) -> int: 

2801 """Return file descriptor number.""" 

2802 return self.f.fileno() 

2803 

2804 def isatty(self) -> bool: 

2805 """Check if file is a terminal.""" 

2806 return getattr(self.f, "isatty", lambda: False)() 

2807 

2808 def truncate(self, size: int | None = None) -> int: 

2809 """Not supported for write-only file. 

2810 

2811 Raises: 

2812 UnsupportedOperation: Always raised 

2813 """ 

2814 raise UnsupportedOperation("truncate") 

2815 

2816 

2817class HashWriter(BinaryIO): 

2818 """Wrapper for file-like object that computes hash of its data. 

2819 

2820 This is a generic version that works with any hash algorithm. 

2821 """ 

2822 

2823 def __init__( 

2824 self, f: BinaryIO | IO[bytes], hash_func: Callable[[], "HashObject"] 

2825 ) -> None: 

2826 """Initialize HashWriter. 

2827 

2828 Args: 

2829 f: File-like object to wrap 

2830 hash_func: Hash function (e.g., sha1, sha256) 

2831 """ 

2832 self.f = f 

2833 self.length = 0 

2834 self.hash_obj = hash_func() 

2835 self.digest: bytes | None = None 

2836 

2837 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2838 """Write data and update hash. 

2839 

2840 Args: 

2841 data: Data to write 

2842 

2843 Returns: 

2844 Number of bytes written 

2845 """ 

2846 self.hash_obj.update(data) 

2847 written = self.f.write(data) 

2848 self.length += written 

2849 return written 

2850 

2851 def write_hash(self) -> bytes: 

2852 """Write the hash digest to the file. 

2853 

2854 Returns: 

2855 The hash digest bytes 

2856 """ 

2857 digest = self.hash_obj.digest() 

2858 self.f.write(digest) 

2859 self.length += len(digest) 

2860 return digest 

2861 

2862 def close(self) -> None: 

2863 """Close the pack file and finalize the hash.""" 

2864 self.digest = self.write_hash() 

2865 self.f.close() 

2866 

2867 def offset(self) -> int: 

2868 """Get the total number of bytes written. 

2869 

2870 Returns: 

2871 Total bytes written 

2872 """ 

2873 return self.length 

2874 

2875 def tell(self) -> int: 

2876 """Return current file position.""" 

2877 return self.f.tell() 

2878 

2879 # BinaryIO abstract methods 

2880 def readable(self) -> bool: 

2881 """Check if file is readable.""" 

2882 return False 

2883 

2884 def writable(self) -> bool: 

2885 """Check if file is writable.""" 

2886 return True 

2887 

2888 def seekable(self) -> bool: 

2889 """Check if file is seekable.""" 

2890 return getattr(self.f, "seekable", lambda: False)() 

2891 

2892 def seek(self, offset: int, whence: int = 0) -> int: 

2893 """Seek to position in file. 

2894 

2895 Args: 

2896 offset: Position offset 

2897 whence: Reference point (0=start, 1=current, 2=end) 

2898 

2899 Returns: 

2900 New file position 

2901 """ 

2902 return self.f.seek(offset, whence) 

2903 

2904 def flush(self) -> None: 

2905 """Flush the file buffer.""" 

2906 if hasattr(self.f, "flush"): 

2907 self.f.flush() 

2908 

2909 def readline(self, size: int = -1) -> bytes: 

2910 """Not supported for write-only file. 

2911 

2912 Raises: 

2913 UnsupportedOperation: Always raised 

2914 """ 

2915 raise UnsupportedOperation("readline") 

2916 

2917 def readlines(self, hint: int = -1) -> list[bytes]: 

2918 """Not supported for write-only file. 

2919 

2920 Raises: 

2921 UnsupportedOperation: Always raised 

2922 """ 

2923 raise UnsupportedOperation("readlines") 

2924 

2925 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2926 """Write multiple lines to the file. 

2927 

2928 Args: 

2929 lines: Iterable of lines to write 

2930 """ 

2931 for line in lines: 

2932 self.write(line) 

2933 

2934 def read(self, size: int = -1) -> bytes: 

2935 """Not supported for write-only file. 

2936 

2937 Raises: 

2938 UnsupportedOperation: Always raised 

2939 """ 

2940 raise UnsupportedOperation("read") 

2941 

2942 def __enter__(self) -> Self: 

2943 """Enter context manager.""" 

2944 return self 

2945 

2946 def __exit__( 

2947 self, 

2948 type: type | None, 

2949 value: BaseException | None, 

2950 traceback: TracebackType | None, 

2951 ) -> None: 

2952 """Exit context manager and close file.""" 

2953 self.close() 

2954 

2955 def __iter__(self) -> "HashWriter": 

2956 """Return iterator.""" 

2957 return self 

2958 

2959 def __next__(self) -> bytes: 

2960 """Not supported for write-only file. 

2961 

2962 Raises: 

2963 UnsupportedOperation: Always raised 

2964 """ 

2965 raise UnsupportedOperation("__next__") 

2966 

2967 def fileno(self) -> int: 

2968 """Return file descriptor number.""" 

2969 return self.f.fileno() 

2970 

2971 def isatty(self) -> bool: 

2972 """Check if file is a terminal.""" 

2973 return getattr(self.f, "isatty", lambda: False)() 

2974 

2975 def truncate(self, size: int | None = None) -> int: 

2976 """Not supported for write-only file. 

2977 

2978 Raises: 

2979 UnsupportedOperation: Always raised 

2980 """ 

2981 raise UnsupportedOperation("truncate") 

2982 

2983 

2984def pack_object_header( 

2985 type_num: int, 

2986 delta_base: bytes | int | None, 

2987 size: int, 

2988 object_format: "ObjectFormat", 

2989) -> bytearray: 

2990 """Create a pack object header for the given object info. 

2991 

2992 Args: 

2993 type_num: Numeric type of the object. 

2994 delta_base: Delta base offset or ref, or None for whole objects. 

2995 size: Uncompressed object size. 

2996 object_format: Object format (hash algorithm) to use. 

2997 Returns: A header for a packed object. 

2998 """ 

2999 header = [] 

3000 c = (type_num << 4) | (size & 15) 

3001 size >>= 4 

3002 while size: 

3003 header.append(c | 0x80) 

3004 c = size & 0x7F 

3005 size >>= 7 

3006 header.append(c) 

3007 if type_num == OFS_DELTA: 

3008 assert isinstance(delta_base, int) 

3009 ret = [delta_base & 0x7F] 

3010 delta_base >>= 7 

3011 while delta_base: 

3012 delta_base -= 1 

3013 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

3014 delta_base >>= 7 

3015 header.extend(ret) 

3016 elif type_num == REF_DELTA: 

3017 assert isinstance(delta_base, bytes) 

3018 assert len(delta_base) == object_format.oid_length 

3019 header += delta_base 

3020 return bytearray(header) 

3021 

3022 

3023def pack_object_chunks( 

3024 type: int, 

3025 object: list[bytes] | tuple[bytes | int, list[bytes]], 

3026 object_format: "ObjectFormat", 

3027 *, 

3028 compression_level: int = -1, 

3029) -> Iterator[bytes]: 

3030 """Generate chunks for a pack object. 

3031 

3032 Args: 

3033 type: Numeric type of the object 

3034 object: Object to write 

3035 object_format: Object format (hash algorithm) to use 

3036 compression_level: the zlib compression level 

3037 Returns: Chunks 

3038 """ 

3039 if type in DELTA_TYPES: 

3040 if isinstance(object, tuple): 

3041 delta_base, object = object 

3042 else: 

3043 raise TypeError("Delta types require a tuple of (delta_base, object)") 

3044 else: 

3045 delta_base = None 

3046 

3047 # Convert object to list of bytes chunks 

3048 if isinstance(object, bytes): 

3049 chunks = [object] 

3050 elif isinstance(object, list): 

3051 chunks = object 

3052 elif isinstance(object, ShaFile): 

3053 chunks = object.as_raw_chunks() 

3054 else: 

3055 # Shouldn't reach here with proper typing 

3056 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

3057 

3058 yield bytes( 

3059 pack_object_header( 

3060 type, delta_base, sum(map(len, chunks)), object_format=object_format 

3061 ) 

3062 ) 

3063 compressor = zlib.compressobj(level=compression_level) 

3064 for data in chunks: 

3065 yield compressor.compress(data) 

3066 yield compressor.flush() 

3067 

3068 

3069def write_pack_object( 

3070 write: Callable[[bytes], int], 

3071 type: int, 

3072 object: list[bytes] | tuple[bytes | int, list[bytes]], 

3073 object_format: "ObjectFormat", 

3074 *, 

3075 sha: "HashObject | None" = None, 

3076 compression_level: int = -1, 

3077) -> int: 

3078 """Write pack object to a file. 

3079 

3080 Args: 

3081 write: Write function to use 

3082 type: Numeric type of the object 

3083 object: Object to write 

3084 object_format: Object format (hash algorithm) to use 

3085 sha: Optional SHA-1 hasher to update 

3086 compression_level: the zlib compression level 

3087 Returns: CRC32 checksum of the written object 

3088 """ 

3089 crc32 = 0 

3090 for chunk in pack_object_chunks( 

3091 type, object, compression_level=compression_level, object_format=object_format 

3092 ): 

3093 write(chunk) 

3094 if sha is not None: 

3095 sha.update(chunk) 

3096 crc32 = binascii.crc32(chunk, crc32) 

3097 return crc32 & 0xFFFFFFFF 

3098 

3099 

3100def write_pack( 

3101 filename: str, 

3102 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

3103 object_format: "ObjectFormat", 

3104 *, 

3105 deltify: bool | None = None, 

3106 delta_window_size: int | None = None, 

3107 compression_level: int = -1, 

3108) -> tuple[bytes, bytes]: 

3109 """Write a new pack data file. 

3110 

3111 Args: 

3112 filename: Path to the new pack file (without .pack extension) 

3113 objects: Objects to write to the pack 

3114 object_format: Object format 

3115 delta_window_size: Delta window size 

3116 deltify: Whether to deltify pack objects 

3117 compression_level: the zlib compression level 

3118 Returns: Tuple with checksum of pack file and index file 

3119 """ 

3120 with GitFile(filename + ".pack", "wb") as f: 

3121 entries, data_sum = write_pack_objects( 

3122 f, 

3123 objects, 

3124 delta_window_size=delta_window_size, 

3125 deltify=deltify, 

3126 compression_level=compression_level, 

3127 object_format=object_format, 

3128 ) 

3129 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

3130 with GitFile(filename + ".idx", "wb") as f: 

3131 idx_sha = write_pack_index(f, entries_list, data_sum) 

3132 return data_sum, idx_sha 

3133 

3134 

3135def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

3136 """Yield chunks for a pack header.""" 

3137 yield b"PACK" # Pack header 

3138 yield struct.pack(b">L", 2) # Pack version 

3139 yield struct.pack(b">L", num_objects) # Number of objects in pack 

3140 

3141 

3142def write_pack_header( 

3143 write: Callable[[bytes], int] | IO[bytes], num_objects: int 

3144) -> None: 

3145 """Write a pack header for the given number of objects.""" 

3146 write_fn: Callable[[bytes], int] 

3147 if hasattr(write, "write"): 

3148 write_fn = write.write 

3149 warnings.warn( 

3150 "write_pack_header() now takes a write rather than file argument", 

3151 DeprecationWarning, 

3152 stacklevel=2, 

3153 ) 

3154 else: 

3155 write_fn = write 

3156 for chunk in pack_header_chunks(num_objects): 

3157 write_fn(chunk) 

3158 

3159 

3160def find_reusable_deltas( 

3161 container: PackedObjectContainer, 

3162 object_ids: Set[ObjectID], 

3163 *, 

3164 other_haves: Set[ObjectID] | None = None, 

3165 progress: Callable[..., None] | None = None, 

3166) -> Iterator[UnpackedObject]: 

3167 """Find deltas in a pack that can be reused. 

3168 

3169 Args: 

3170 container: Pack container to search for deltas 

3171 object_ids: Set of object IDs to find deltas for 

3172 other_haves: Set of other object IDs we have 

3173 progress: Optional progress reporting callback 

3174 

3175 Returns: 

3176 Iterator of UnpackedObject entries that can be reused 

3177 """ 

3178 if other_haves is None: 

3179 other_haves = set() 

3180 reused = 0 

3181 for i, unpacked in enumerate( 

3182 container.iter_unpacked_subset( 

3183 object_ids, allow_missing=True, convert_ofs_delta=True 

3184 ) 

3185 ): 

3186 if progress is not None and i % 1000 == 0: 

3187 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

3188 if unpacked.pack_type_num == REF_DELTA: 

3189 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

3190 if hexsha in object_ids or hexsha in other_haves: 

3191 yield unpacked 

3192 reused += 1 

3193 if progress is not None: 

3194 progress((f"found {reused} deltas to reuse\n").encode()) 

3195 

3196 

3197def deltify_pack_objects( 

3198 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, bytes | None]], 

3199 *, 

3200 window_size: int | None = None, 

3201 progress: Callable[..., None] | None = None, 

3202) -> Iterator[UnpackedObject]: 

3203 """Generate deltas for pack objects. 

3204 

3205 Args: 

3206 objects: An iterable of (object, path) tuples to deltify. 

3207 window_size: Window size; None for default 

3208 progress: Optional progress reporting callback 

3209 Returns: Iterator over type_num, object id, delta_base, content 

3210 delta_base is None for full text entries 

3211 """ 

3212 

3213 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, bytes | None]]]: 

3214 for e in objects: 

3215 if isinstance(e, ShaFile): 

3216 yield (e, (e.type_num, None)) 

3217 else: 

3218 yield (e[0], (e[0].type_num, e[1])) 

3219 

3220 sorted_objs = sort_objects_for_delta(objects_with_hints()) 

3221 yield from deltas_from_sorted_objects( 

3222 sorted_objs, 

3223 window_size=window_size, 

3224 progress=progress, 

3225 ) 

3226 

3227 

3228def sort_objects_for_delta( 

3229 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, PackHint | None]], 

3230) -> Iterator[tuple[ShaFile, bytes | None]]: 

3231 """Sort objects for optimal delta compression. 

3232 

3233 Args: 

3234 objects: Iterator of objects or (object, hint) tuples 

3235 

3236 Returns: 

3237 Iterator of sorted (ShaFile, path) tuples 

3238 """ 

3239 magic = [] 

3240 for entry in objects: 

3241 if isinstance(entry, tuple): 

3242 obj, hint = entry 

3243 if hint is None: 

3244 type_num = None 

3245 path = None 

3246 else: 

3247 (type_num, path) = hint 

3248 else: 

3249 obj = entry 

3250 type_num = None 

3251 path = None 

3252 magic.append((type_num, path, -obj.raw_length(), obj)) 

3253 # Build a list of objects ordered by the magic Linus heuristic 

3254 # This helps us find good objects to diff against us 

3255 magic.sort() 

3256 return ((x[3], x[1]) for x in magic) 

3257 

3258 

3259def deltas_from_sorted_objects( 

3260 objects: Iterator[tuple[ShaFile, bytes | None]], 

3261 window_size: int | None = None, 

3262 progress: Callable[..., None] | None = None, 

3263) -> Iterator[UnpackedObject]: 

3264 """Create deltas from sorted objects. 

3265 

3266 Args: 

3267 objects: Iterator of sorted objects to deltify 

3268 window_size: Delta window size; None for default 

3269 progress: Optional progress reporting callback 

3270 

3271 Returns: 

3272 Iterator of UnpackedObject entries 

3273 """ 

3274 # TODO(jelmer): Use threads 

3275 if window_size is None: 

3276 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

3277 

3278 possible_bases: deque[tuple[bytes, int, bytes]] = deque() 

3279 for i, (o, path) in enumerate(objects): 

3280 if progress is not None and i % 1000 == 0: 

3281 progress((f"generating deltas: {i}\r").encode()) 

3282 raw = o.as_raw_chunks() 

3283 raw_bytes = b"".join(raw) # Join once for efficiency 

3284 winner = raw 

3285 winner_len = sum(map(len, winner)) 

3286 winner_base = None 

3287 for base_id, base_type_num, base_bytes in possible_bases: 

3288 if base_type_num != o.type_num: 

3289 continue 

3290 delta_len = 0 

3291 delta = [] 

3292 for chunk in create_delta(base_bytes, raw_bytes): 

3293 delta_len += len(chunk) 

3294 if delta_len >= winner_len: 

3295 break 

3296 delta.append(chunk) 

3297 else: 

3298 winner_base = base_id 

3299 winner = delta 

3300 winner_len = sum(map(len, winner)) 

3301 yield UnpackedObject( 

3302 o.type_num, 

3303 sha=o.sha().digest(), 

3304 delta_base=winner_base, 

3305 decomp_len=winner_len, 

3306 decomp_chunks=winner, 

3307 ) 

3308 possible_bases.appendleft((o.sha().digest(), o.type_num, raw_bytes)) 

3309 while len(possible_bases) > window_size: 

3310 possible_bases.pop() 

3311 

3312 

3313def pack_objects_to_data( 

3314 objects: Sequence[ShaFile] 

3315 | Sequence[tuple[ShaFile, bytes | None]] 

3316 | Sequence[tuple[ShaFile, PackHint | None]], 

3317 *, 

3318 deltify: bool | None = None, 

3319 delta_window_size: int | None = None, 

3320 ofs_delta: bool = True, 

3321 progress: Callable[..., None] | None = None, 

3322) -> tuple[int, Iterator[UnpackedObject]]: 

3323 """Create pack data from objects. 

3324 

3325 Args: 

3326 objects: Pack objects 

3327 deltify: Whether to deltify pack objects 

3328 delta_window_size: Delta window size 

3329 ofs_delta: Whether to use offset deltas 

3330 progress: Optional progress reporting callback 

3331 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

3332 """ 

3333 count = len(objects) 

3334 if deltify is None: 

3335 # PERFORMANCE/TODO(jelmer): This should be enabled but the python 

3336 # implementation is *much* too slow at the moment. 

3337 # Maybe consider enabling it just if the rust extension is available? 

3338 deltify = False 

3339 if deltify: 

3340 return ( 

3341 count, 

3342 deltify_pack_objects( 

3343 iter(objects), # type: ignore 

3344 window_size=delta_window_size, 

3345 progress=progress, 

3346 ), 

3347 ) 

3348 else: 

3349 

3350 def iter_without_path() -> Iterator[UnpackedObject]: 

3351 for o in objects: 

3352 if isinstance(o, tuple): 

3353 yield full_unpacked_object(o[0]) 

3354 else: 

3355 yield full_unpacked_object(o) 

3356 

3357 return (count, iter_without_path()) 

3358 

3359 

3360def generate_unpacked_objects( 

3361 container: PackedObjectContainer, 

3362 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

3363 delta_window_size: int | None = None, 

3364 deltify: bool | None = None, 

3365 reuse_deltas: bool = True, 

3366 ofs_delta: bool = True, 

3367 other_haves: set[ObjectID] | None = None, 

3368 progress: Callable[..., None] | None = None, 

3369) -> Iterator[UnpackedObject]: 

3370 """Create pack data from objects. 

3371 

3372 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

3373 """ 

3374 todo = dict(object_ids) 

3375 if reuse_deltas: 

3376 for unpack in find_reusable_deltas( 

3377 container, set(todo), other_haves=other_haves, progress=progress 

3378 ): 

3379 del todo[sha_to_hex(RawObjectID(unpack.sha()))] 

3380 yield unpack 

3381 if deltify is None: 

3382 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

3383 # slow at the moment. 

3384 deltify = False 

3385 if deltify: 

3386 objects_to_delta = container.iterobjects_subset( 

3387 todo.keys(), allow_missing=False 

3388 ) 

3389 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta) 

3390 yield from deltas_from_sorted_objects( 

3391 sorted_objs, 

3392 window_size=delta_window_size, 

3393 progress=progress, 

3394 ) 

3395 else: 

3396 for oid in todo: 

3397 yield full_unpacked_object(container[oid]) 

3398 

3399 

3400def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

3401 """Create an UnpackedObject from a ShaFile. 

3402 

3403 Args: 

3404 o: ShaFile object to convert 

3405 

3406 Returns: 

3407 UnpackedObject with full object data 

3408 """ 

3409 return UnpackedObject( 

3410 o.type_num, 

3411 delta_base=None, 

3412 crc32=None, 

3413 decomp_chunks=o.as_raw_chunks(), 

3414 sha=o.sha().digest(), 

3415 ) 

3416 

3417 

3418def write_pack_from_container( 

3419 write: Callable[[bytes], None] 

3420 | Callable[[bytes | bytearray | memoryview], int] 

3421 | IO[bytes], 

3422 container: PackedObjectContainer, 

3423 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

3424 object_format: "ObjectFormat", 

3425 *, 

3426 delta_window_size: int | None = None, 

3427 deltify: bool | None = None, 

3428 reuse_deltas: bool = True, 

3429 compression_level: int = -1, 

3430 other_haves: set[ObjectID] | None = None, 

3431) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3432 """Write a new pack data file. 

3433 

3434 Args: 

3435 write: write function to use 

3436 container: PackedObjectContainer 

3437 object_ids: Sequence of (object_id, hint) tuples to write 

3438 object_format: Object format (hash algorithm) to use 

3439 delta_window_size: Sliding window size for searching for deltas; 

3440 Set to None for default window size. 

3441 deltify: Whether to deltify objects 

3442 reuse_deltas: Whether to reuse existing deltas 

3443 compression_level: the zlib compression level to use 

3444 other_haves: Set of additional object IDs the receiver has 

3445 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3446 """ 

3447 pack_contents_count = len(object_ids) 

3448 pack_contents = generate_unpacked_objects( 

3449 container, 

3450 object_ids, 

3451 delta_window_size=delta_window_size, 

3452 deltify=deltify, 

3453 reuse_deltas=reuse_deltas, 

3454 other_haves=other_haves, 

3455 ) 

3456 

3457 return write_pack_data( 

3458 write, 

3459 pack_contents, 

3460 num_records=pack_contents_count, 

3461 compression_level=compression_level, 

3462 object_format=object_format, 

3463 ) 

3464 

3465 

3466def write_pack_objects( 

3467 write: Callable[[bytes], None] | IO[bytes], 

3468 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

3469 object_format: "ObjectFormat", 

3470 *, 

3471 delta_window_size: int | None = None, 

3472 deltify: bool | None = None, 

3473 compression_level: int = -1, 

3474) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3475 """Write a new pack data file. 

3476 

3477 Args: 

3478 write: write function to use 

3479 objects: Sequence of (object, path) tuples to write 

3480 object_format: Object format (hash algorithm) to use 

3481 delta_window_size: Sliding window size for searching for deltas; 

3482 Set to None for default window size. 

3483 deltify: Whether to deltify objects 

3484 compression_level: the zlib compression level to use 

3485 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3486 """ 

3487 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

3488 

3489 return write_pack_data( 

3490 write, 

3491 pack_contents, 

3492 num_records=pack_contents_count, 

3493 compression_level=compression_level, 

3494 object_format=object_format, 

3495 ) 

3496 

3497 

3498class PackChunkGenerator: 

3499 """Generator for pack data chunks.""" 

3500 

3501 def __init__( 

3502 self, 

3503 object_format: "ObjectFormat", 

3504 num_records: int | None = None, 

3505 records: Iterator[UnpackedObject] | None = None, 

3506 progress: Callable[..., None] | None = None, 

3507 compression_level: int = -1, 

3508 reuse_compressed: bool = True, 

3509 ) -> None: 

3510 """Initialize PackChunkGenerator. 

3511 

3512 Args: 

3513 num_records: Expected number of records 

3514 records: Iterator of pack records 

3515 progress: Optional progress callback 

3516 compression_level: Compression level (-1 for default) 

3517 reuse_compressed: Whether to reuse compressed chunks 

3518 object_format: Object format (hash algorithm) to use 

3519 """ 

3520 self.object_format = object_format 

3521 self.cs = object_format.new_hash() 

3522 self.entries: dict[bytes, tuple[int, int]] = {} 

3523 if records is None: 

3524 records = iter([]) # Empty iterator if None 

3525 self._it = self._pack_data_chunks( 

3526 records=records, 

3527 num_records=num_records, 

3528 progress=progress, 

3529 compression_level=compression_level, 

3530 reuse_compressed=reuse_compressed, 

3531 ) 

3532 

3533 def sha1digest(self) -> bytes: 

3534 """Return the SHA1 digest of the pack data.""" 

3535 return self.cs.digest() 

3536 

3537 def __iter__(self) -> Iterator[bytes]: 

3538 """Iterate over pack data chunks.""" 

3539 return self._it 

3540 

3541 def _pack_data_chunks( 

3542 self, 

3543 records: Iterator[UnpackedObject], 

3544 *, 

3545 num_records: int | None = None, 

3546 progress: Callable[..., None] | None = None, 

3547 compression_level: int = -1, 

3548 reuse_compressed: bool = True, 

3549 ) -> Iterator[bytes]: 

3550 """Iterate pack data file chunks. 

3551 

3552 Args: 

3553 records: Iterator over UnpackedObject 

3554 num_records: Number of records (defaults to len(records) if not specified) 

3555 progress: Function to report progress to 

3556 compression_level: the zlib compression level 

3557 reuse_compressed: Whether to reuse compressed chunks 

3558 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3559 """ 

3560 # Write the pack 

3561 if num_records is None: 

3562 num_records = len(records) # type: ignore 

3563 offset = 0 

3564 for chunk in pack_header_chunks(num_records): 

3565 yield chunk 

3566 self.cs.update(chunk) 

3567 offset += len(chunk) 

3568 actual_num_records = 0 

3569 for i, unpacked in enumerate(records): 

3570 type_num = unpacked.pack_type_num 

3571 if progress is not None and i % 1000 == 0: 

3572 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

3573 raw: list[bytes] | tuple[int, list[bytes]] | tuple[bytes, list[bytes]] 

3574 if unpacked.delta_base is not None: 

3575 assert isinstance(unpacked.delta_base, bytes), ( 

3576 f"Expected bytes, got {type(unpacked.delta_base)}" 

3577 ) 

3578 try: 

3579 base_offset, _base_crc32 = self.entries[unpacked.delta_base] 

3580 except KeyError: 

3581 type_num = REF_DELTA 

3582 assert isinstance(unpacked.delta_base, bytes) 

3583 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

3584 else: 

3585 type_num = OFS_DELTA 

3586 raw = (offset - base_offset, unpacked.decomp_chunks) 

3587 else: 

3588 raw = unpacked.decomp_chunks 

3589 chunks: list[bytes] | Iterator[bytes] 

3590 if unpacked.comp_chunks is not None and reuse_compressed: 

3591 chunks = unpacked.comp_chunks 

3592 else: 

3593 chunks = pack_object_chunks( 

3594 type_num, 

3595 raw, 

3596 compression_level=compression_level, 

3597 object_format=self.object_format, 

3598 ) 

3599 crc32 = 0 

3600 object_size = 0 

3601 for chunk in chunks: 

3602 yield chunk 

3603 crc32 = binascii.crc32(chunk, crc32) 

3604 self.cs.update(chunk) 

3605 object_size += len(chunk) 

3606 actual_num_records += 1 

3607 self.entries[unpacked.sha()] = (offset, crc32) 

3608 offset += object_size 

3609 if actual_num_records != num_records: 

3610 raise AssertionError( 

3611 f"actual records written differs: {actual_num_records} != {num_records}" 

3612 ) 

3613 

3614 yield self.cs.digest() 

3615 

3616 

3617def write_pack_data( 

3618 write: Callable[[bytes], None] 

3619 | Callable[[bytes | bytearray | memoryview], int] 

3620 | IO[bytes], 

3621 records: Iterator[UnpackedObject], 

3622 object_format: "ObjectFormat", 

3623 *, 

3624 num_records: int | None = None, 

3625 progress: Callable[..., None] | None = None, 

3626 compression_level: int = -1, 

3627) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3628 """Write a new pack data file. 

3629 

3630 Args: 

3631 write: Write function to use 

3632 num_records: Number of records (defaults to len(records) if None) 

3633 records: Iterator over type_num, object_id, delta_base, raw 

3634 object_format: Object format (hash algorithm) to use 

3635 progress: Function to report progress to 

3636 compression_level: the zlib compression level 

3637 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3638 """ 

3639 chunk_generator = PackChunkGenerator( 

3640 num_records=num_records, 

3641 records=records, 

3642 progress=progress, 

3643 compression_level=compression_level, 

3644 object_format=object_format, 

3645 ) 

3646 for chunk in chunk_generator: 

3647 if callable(write): 

3648 write(chunk) 

3649 else: 

3650 write.write(chunk) 

3651 return chunk_generator.entries, chunk_generator.sha1digest() 

3652 

3653 

3654def write_pack_index_v1( 

3655 f: IO[bytes], 

3656 entries: Iterable[tuple[bytes, int, int | None]], 

3657 pack_checksum: bytes, 

3658) -> bytes: 

3659 """Write a new pack index file. 

3660 

3661 Args: 

3662 f: A file-like object to write to 

3663 entries: List of tuples with object name (sha), offset_in_pack, 

3664 and crc32_checksum. 

3665 pack_checksum: Checksum of the pack file. 

3666 Returns: The SHA of the written index file 

3667 """ 

3668 f = SHA1Writer(f) 

3669 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3670 for name, _offset, _entry_checksum in entries: 

3671 fan_out_table[ord(name[:1])] += 1 

3672 # Fan-out table 

3673 for i in range(0x100): 

3674 f.write(struct.pack(">L", fan_out_table[i])) 

3675 fan_out_table[i + 1] += fan_out_table[i] 

3676 for name, offset, _entry_checksum in entries: 

3677 if len(name) != 20: 

3678 raise TypeError("pack index v1 only supports SHA-1 names") 

3679 if not (offset <= 0xFFFFFFFF): 

3680 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3681 f.write(struct.pack(">L20s", offset, name)) 

3682 assert len(pack_checksum) == 20 

3683 f.write(pack_checksum) 

3684 return f.write_sha() 

3685 

3686 

3687def _delta_encode_size(size: int) -> bytes: 

3688 ret = bytearray() 

3689 c = size & 0x7F 

3690 size >>= 7 

3691 while size: 

3692 ret.append(c | 0x80) 

3693 c = size & 0x7F 

3694 size >>= 7 

3695 ret.append(c) 

3696 return bytes(ret) 

3697 

3698 

3699# The length of delta compression copy operations in version 2 packs is limited 

3700# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3701# 24-bit lengths in copy operations, but we always make version 2 packs. 

3702_MAX_COPY_LEN = 0xFFFF 

3703 

3704 

3705def _encode_copy_operation(start: int, length: int) -> bytes: 

3706 scratch = bytearray([0x80]) 

3707 for i in range(4): 

3708 if start & 0xFF << i * 8: 

3709 scratch.append((start >> i * 8) & 0xFF) 

3710 scratch[0] |= 1 << i 

3711 for i in range(2): 

3712 if length & 0xFF << i * 8: 

3713 scratch.append((length >> i * 8) & 0xFF) 

3714 scratch[0] |= 1 << (4 + i) 

3715 return bytes(scratch) 

3716 

3717 

3718def _create_delta_py(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3719 """Use python difflib to work out how to transform base_buf to target_buf. 

3720 

3721 Args: 

3722 base_buf: Base buffer 

3723 target_buf: Target buffer 

3724 """ 

3725 if isinstance(base_buf, list): 

3726 base_buf = b"".join(base_buf) 

3727 if isinstance(target_buf, list): 

3728 target_buf = b"".join(target_buf) 

3729 assert isinstance(base_buf, bytes) 

3730 assert isinstance(target_buf, bytes) 

3731 # write delta header 

3732 yield _delta_encode_size(len(base_buf)) 

3733 yield _delta_encode_size(len(target_buf)) 

3734 # write out delta opcodes 

3735 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3736 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3737 # Git patch opcodes don't care about deletes! 

3738 # if opcode == 'replace' or opcode == 'delete': 

3739 # pass 

3740 if opcode == "equal": 

3741 # If they are equal, unpacker will use data from base_buf 

3742 # Write out an opcode that says what range to use 

3743 copy_start = i1 

3744 copy_len = i2 - i1 

3745 while copy_len > 0: 

3746 to_copy = min(copy_len, _MAX_COPY_LEN) 

3747 yield _encode_copy_operation(copy_start, to_copy) 

3748 copy_start += to_copy 

3749 copy_len -= to_copy 

3750 if opcode == "replace" or opcode == "insert": 

3751 # If we are replacing a range or adding one, then we just 

3752 # output it to the stream (prefixed by its size) 

3753 s = j2 - j1 

3754 o = j1 

3755 while s > 127: 

3756 yield bytes([127]) 

3757 yield bytes(memoryview(target_buf)[o : o + 127]) 

3758 s -= 127 

3759 o += 127 

3760 yield bytes([s]) 

3761 yield bytes(memoryview(target_buf)[o : o + s]) 

3762 

3763 

3764# Default to pure Python implementation 

3765create_delta = _create_delta_py 

3766 

3767 

3768def apply_delta( 

3769 src_buf: bytes | list[bytes], delta: bytes | list[bytes] 

3770) -> list[bytes]: 

3771 """Based on the similar function in git's patch-delta.c. 

3772 

3773 Args: 

3774 src_buf: Source buffer 

3775 delta: Delta instructions 

3776 """ 

3777 if not isinstance(src_buf, bytes): 

3778 src_buf = b"".join(src_buf) 

3779 if not isinstance(delta, bytes): 

3780 delta = b"".join(delta) 

3781 out = [] 

3782 index = 0 

3783 delta_length = len(delta) 

3784 

3785 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3786 size = 0 

3787 i = 0 

3788 while True: 

3789 # Bound-check explicitly: ``delta[index:index+1]`` silently 

3790 # returns b"" past the end, which would crash with TypeError 

3791 # in ``ord`` and leave the caller unable to distinguish a 

3792 # truncated delta from a programming bug. 

3793 if index >= delta_length: 

3794 raise ApplyDeltaError("delta truncated in size header") 

3795 cmd = ord(delta[index : index + 1]) 

3796 index += 1 

3797 size |= (cmd & ~0x80) << i 

3798 i += 7 

3799 if not cmd & 0x80: 

3800 break 

3801 return size, index 

3802 

3803 src_size, index = get_delta_header_size(delta, index) 

3804 dest_size, index = get_delta_header_size(delta, index) 

3805 if src_size != len(src_buf): 

3806 raise ApplyDeltaError( 

3807 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3808 ) 

3809 while index < delta_length: 

3810 cmd = ord(delta[index : index + 1]) 

3811 index += 1 

3812 if cmd & 0x80: 

3813 cp_off = 0 

3814 for i in range(4): 

3815 if cmd & (1 << i): 

3816 x = ord(delta[index : index + 1]) 

3817 index += 1 

3818 cp_off |= x << (i * 8) 

3819 cp_size = 0 

3820 # Version 3 packs can contain copy sizes larger than 64K. 

3821 for i in range(3): 

3822 if cmd & (1 << (4 + i)): 

3823 x = ord(delta[index : index + 1]) 

3824 index += 1 

3825 cp_size |= x << (i * 8) 

3826 if cp_size == 0: 

3827 cp_size = 0x10000 

3828 if ( 

3829 cp_off + cp_size < cp_size 

3830 or cp_off + cp_size > src_size 

3831 or cp_size > dest_size 

3832 ): 

3833 break 

3834 out.append(src_buf[cp_off : cp_off + cp_size]) 

3835 elif cmd != 0: 

3836 out.append(delta[index : index + cmd]) 

3837 index += cmd 

3838 else: 

3839 raise ApplyDeltaError("Invalid opcode 0") 

3840 

3841 if index != delta_length: 

3842 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3843 

3844 if dest_size != chunks_length(out): 

3845 raise ApplyDeltaError("dest size incorrect") 

3846 

3847 return out 

3848 

3849 

3850def write_pack_index_v2( 

3851 f: IO[bytes], 

3852 entries: Iterable[tuple[bytes, int, int | None]], 

3853 pack_checksum: bytes, 

3854) -> bytes: 

3855 """Write a new pack index file. 

3856 

3857 Args: 

3858 f: File-like object to write to 

3859 entries: List of tuples with object name (sha), offset_in_pack, and 

3860 crc32_checksum. 

3861 pack_checksum: Checksum of the pack file. 

3862 Returns: The checksum of the index file written 

3863 """ 

3864 # Determine hash algorithm from pack_checksum length 

3865 if len(pack_checksum) == 20: 

3866 hash_func = sha1 

3867 elif len(pack_checksum) == 32: 

3868 hash_func = sha256 

3869 else: 

3870 raise ValueError(f"Unsupported pack checksum length: {len(pack_checksum)}") 

3871 

3872 f_writer = HashWriter(f, hash_func) 

3873 f_writer.write(b"\377tOc") # Magic! 

3874 f_writer.write(struct.pack(">L", 2)) 

3875 

3876 # Convert to list to allow multiple iterations 

3877 entries_list = list(entries) 

3878 

3879 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3880 for name, offset, entry_checksum in entries_list: 

3881 fan_out_table[ord(name[:1])] += 1 

3882 

3883 if entries_list: 

3884 hash_size = len(entries_list[0][0]) 

3885 else: 

3886 hash_size = len(pack_checksum) # Use pack_checksum length as hash size 

3887 

3888 # Fan-out table 

3889 largetable: list[int] = [] 

3890 for i in range(0x100): 

3891 f_writer.write(struct.pack(b">L", fan_out_table[i])) 

3892 fan_out_table[i + 1] += fan_out_table[i] 

3893 for name, offset, entry_checksum in entries_list: 

3894 if len(name) != hash_size: 

3895 raise TypeError( 

3896 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3897 ) 

3898 f_writer.write(name) 

3899 for name, offset, entry_checksum in entries_list: 

3900 f_writer.write(struct.pack(b">L", entry_checksum)) 

3901 for name, offset, entry_checksum in entries_list: 

3902 if offset < 2**31: 

3903 f_writer.write(struct.pack(b">L", offset)) 

3904 else: 

3905 f_writer.write(struct.pack(b">L", 2**31 + len(largetable))) 

3906 largetable.append(offset) 

3907 for offset in largetable: 

3908 f_writer.write(struct.pack(b">Q", offset)) 

3909 f_writer.write(pack_checksum) 

3910 return f_writer.write_hash() 

3911 

3912 

3913def write_pack_index_v3( 

3914 f: IO[bytes], 

3915 entries: Iterable[tuple[bytes, int, int | None]], 

3916 pack_checksum: bytes, 

3917 hash_format: int = 1, 

3918) -> bytes: 

3919 """Write a new pack index file in v3 format. 

3920 

3921 Args: 

3922 f: File-like object to write to 

3923 entries: List of tuples with object name (sha), offset_in_pack, and 

3924 crc32_checksum. 

3925 pack_checksum: Checksum of the pack file. 

3926 hash_format: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3927 Returns: The SHA of the index file written 

3928 """ 

3929 if hash_format == 1: 

3930 hash_size = 20 # SHA-1 

3931 writer_cls = SHA1Writer 

3932 elif hash_format == 2: 

3933 hash_size = 32 # SHA-256 

3934 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3935 raise NotImplementedError("SHA-256 support not yet implemented") 

3936 else: 

3937 raise ValueError(f"Unknown hash algorithm {hash_format}") 

3938 

3939 # Convert entries to list to allow multiple iterations 

3940 entries_list = list(entries) 

3941 

3942 # Calculate shortest unambiguous prefix length for object names 

3943 # For now, use full hash size (this could be optimized) 

3944 shortened_oid_len = hash_size 

3945 

3946 f = writer_cls(f) 

3947 f.write(b"\377tOc") # Magic! 

3948 f.write(struct.pack(">L", 3)) # Version 3 

3949 f.write(struct.pack(">L", hash_format)) # Hash algorithm 

3950 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3951 

3952 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3953 for name, offset, entry_checksum in entries_list: 

3954 if len(name) != hash_size: 

3955 raise ValueError( 

3956 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3957 ) 

3958 fan_out_table[ord(name[:1])] += 1 

3959 

3960 # Fan-out table 

3961 largetable: list[int] = [] 

3962 for i in range(0x100): 

3963 f.write(struct.pack(b">L", fan_out_table[i])) 

3964 fan_out_table[i + 1] += fan_out_table[i] 

3965 

3966 # Object names table 

3967 for name, offset, entry_checksum in entries_list: 

3968 f.write(name) 

3969 

3970 # CRC32 checksums table 

3971 for name, offset, entry_checksum in entries_list: 

3972 f.write(struct.pack(b">L", entry_checksum)) 

3973 

3974 # Offset table 

3975 for name, offset, entry_checksum in entries_list: 

3976 if offset < 2**31: 

3977 f.write(struct.pack(b">L", offset)) 

3978 else: 

3979 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3980 largetable.append(offset) 

3981 

3982 # Large offset table 

3983 for offset in largetable: 

3984 f.write(struct.pack(b">Q", offset)) 

3985 

3986 assert len(pack_checksum) == hash_size, ( 

3987 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3988 ) 

3989 f.write(pack_checksum) 

3990 return f.write_sha() 

3991 

3992 

3993def write_pack_index( 

3994 f: IO[bytes], 

3995 entries: Iterable[tuple[bytes, int, int | None]], 

3996 pack_checksum: bytes, 

3997 progress: Callable[..., None] | None = None, 

3998 version: int | None = None, 

3999) -> bytes: 

4000 """Write a pack index file. 

4001 

4002 Args: 

4003 f: File-like object to write to. 

4004 entries: List of (checksum, offset, crc32) tuples 

4005 pack_checksum: Checksum of the pack file. 

4006 progress: Progress function (not currently used) 

4007 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

4008 

4009 Returns: 

4010 SHA of the written index file 

4011 

4012 Raises: 

4013 ValueError: If an unsupported version is specified 

4014 """ 

4015 if version is None: 

4016 version = DEFAULT_PACK_INDEX_VERSION 

4017 

4018 if version == 1: 

4019 return write_pack_index_v1(f, entries, pack_checksum) 

4020 elif version == 2: 

4021 return write_pack_index_v2(f, entries, pack_checksum) 

4022 elif version == 3: 

4023 return write_pack_index_v3(f, entries, pack_checksum) 

4024 else: 

4025 raise ValueError(f"Unsupported pack index version: {version}") 

4026 

4027 

4028class Pack: 

4029 """A Git pack object.""" 

4030 

4031 _data_load: Callable[[], PackData] | None 

4032 _idx_load: Callable[[], PackIndex] | None 

4033 

4034 _data: PackData | None 

4035 _idx: PackIndex | None 

4036 _bitmap: "PackBitmap | None" 

4037 

4038 def __init__( 

4039 self, 

4040 basename: str, 

4041 *, 

4042 object_format: ObjectFormat, 

4043 resolve_ext_ref: ResolveExtRefFn | None = None, 

4044 delta_window_size: int | None = None, 

4045 window_memory: int | None = None, 

4046 delta_cache_size: int | None = None, 

4047 depth: int | None = None, 

4048 threads: int | None = None, 

4049 big_file_threshold: int | None = None, 

4050 delta_base_cache_limit: int | None = None, 

4051 ) -> None: 

4052 """Initialize a Pack object. 

4053 

4054 Args: 

4055 basename: Base path for pack files (without .pack/.idx extension) 

4056 object_format: Hash algorithm used by the repository 

4057 resolve_ext_ref: Optional function to resolve external references 

4058 delta_window_size: Size of the delta compression window 

4059 window_memory: Memory limit for delta compression window 

4060 delta_cache_size: Size of the delta cache 

4061 depth: Maximum depth for delta chains 

4062 threads: Number of threads to use for operations 

4063 big_file_threshold: Size threshold for big file handling 

4064 delta_base_cache_limit: Maximum bytes for delta base object cache 

4065 """ 

4066 self._basename = basename 

4067 self.object_format = object_format 

4068 self._data = None 

4069 self._idx = None 

4070 self._bitmap = None 

4071 self._idx_path = self._basename + ".idx" 

4072 self._data_path = self._basename + ".pack" 

4073 self._bitmap_path = self._basename + ".bitmap" 

4074 self.delta_window_size = delta_window_size 

4075 self.window_memory = window_memory 

4076 self.delta_cache_size = delta_cache_size 

4077 self.depth = depth 

4078 self.threads = threads 

4079 self.big_file_threshold = big_file_threshold 

4080 self.delta_base_cache_limit = delta_base_cache_limit 

4081 self._idx_load = lambda: load_pack_index(self._idx_path, object_format) 

4082 self._data_load = lambda: PackData( 

4083 self._data_path, 

4084 delta_window_size=delta_window_size, 

4085 window_memory=window_memory, 

4086 delta_cache_size=delta_cache_size, 

4087 depth=depth, 

4088 threads=threads, 

4089 big_file_threshold=big_file_threshold, 

4090 delta_base_cache_limit=delta_base_cache_limit, 

4091 object_format=object_format, 

4092 ) 

4093 self.resolve_ext_ref = resolve_ext_ref 

4094 

4095 @classmethod 

4096 def from_lazy_objects( 

4097 cls, 

4098 data_fn: Callable[[], PackData], 

4099 idx_fn: Callable[[], PackIndex], 

4100 ) -> "Pack": 

4101 """Create a new pack object from callables to load pack data and index objects.""" 

4102 # Load index to get object format 

4103 idx = idx_fn() 

4104 ret = cls("", object_format=idx.object_format) 

4105 ret._data_load = data_fn 

4106 ret._idx = idx 

4107 ret._idx_load = None 

4108 return ret 

4109 

4110 @classmethod 

4111 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

4112 """Create a new pack object from pack data and index objects.""" 

4113 ret = cls("", object_format=idx.object_format) 

4114 ret._data = data 

4115 ret._data_load = None 

4116 ret._idx = idx 

4117 ret._idx_load = None 

4118 ret.check_length_and_checksum() 

4119 return ret 

4120 

4121 def name(self) -> bytes: 

4122 """The SHA over the SHAs of the objects in this pack.""" 

4123 return self.index.objects_sha1() 

4124 

4125 @property 

4126 def data(self) -> PackData: 

4127 """The pack data object being used.""" 

4128 if self._data is None: 

4129 assert self._data_load 

4130 try: 

4131 self._data = self._data_load() 

4132 except FileNotFoundError as exc: 

4133 raise PackFileDisappeared(self) from exc 

4134 self.check_length_and_checksum() 

4135 return self._data 

4136 

4137 @property 

4138 def index(self) -> PackIndex: 

4139 """The index being used. 

4140 

4141 Note: This may be an in-memory index 

4142 """ 

4143 if self._idx is None: 

4144 assert self._idx_load 

4145 try: 

4146 self._idx = self._idx_load() 

4147 except FileNotFoundError as exc: 

4148 raise PackFileDisappeared(self) from exc 

4149 return self._idx 

4150 

4151 @property 

4152 def bitmap(self) -> "PackBitmap | None": 

4153 """The bitmap being used, if available. 

4154 

4155 Returns: 

4156 PackBitmap instance or None if no bitmap exists 

4157 

4158 Raises: 

4159 ValueError: If bitmap file is invalid or corrupt 

4160 """ 

4161 if self._bitmap is None: 

4162 from .bitmap import read_bitmap 

4163 

4164 self._bitmap = read_bitmap(self._bitmap_path, pack_index=self.index) 

4165 return self._bitmap 

4166 

4167 def ensure_bitmap( 

4168 self, 

4169 object_store: "BaseObjectStore", 

4170 refs: dict["Ref", "ObjectID"], 

4171 commit_interval: int | None = None, 

4172 progress: Callable[[str], None] | None = None, 

4173 ) -> "PackBitmap": 

4174 """Ensure a bitmap exists for this pack, generating one if needed. 

4175 

4176 Args: 

4177 object_store: Object store to read objects from 

4178 refs: Dictionary of ref names to commit SHAs 

4179 commit_interval: Include every Nth commit in bitmap index 

4180 progress: Optional progress reporting callback 

4181 

4182 Returns: 

4183 PackBitmap instance (either existing or newly generated) 

4184 """ 

4185 from .bitmap import generate_bitmap, write_bitmap 

4186 

4187 # Check if bitmap already exists 

4188 try: 

4189 existing = self.bitmap 

4190 if existing is not None: 

4191 return existing 

4192 except FileNotFoundError: 

4193 pass # No bitmap, we'll generate one 

4194 

4195 # Generate new bitmap 

4196 if progress: 

4197 progress(f"Generating bitmap for {self.name().decode('utf-8')}...\n") 

4198 

4199 pack_bitmap = generate_bitmap( 

4200 self.index, 

4201 object_store, 

4202 refs, 

4203 self.get_stored_checksum(), 

4204 commit_interval=commit_interval, 

4205 progress=progress, 

4206 ) 

4207 

4208 # Write bitmap file 

4209 write_bitmap(self._bitmap_path, pack_bitmap) 

4210 

4211 if progress: 

4212 progress(f"Wrote {self._bitmap_path}\n") 

4213 

4214 # Update cached bitmap 

4215 self._bitmap = pack_bitmap 

4216 

4217 return pack_bitmap 

4218 

4219 @property 

4220 def mmap_size(self) -> int: 

4221 """Return the total mmapped memory usage of this pack. 

4222 

4223 This includes the pack data file and index file sizes, 

4224 but only for components that have been loaded (and thus mmapped). 

4225 """ 

4226 total = 0 

4227 if self._data is not None: 

4228 total += self._data._get_size() 

4229 if self._idx is not None and isinstance(self._idx, FilePackIndex): 

4230 total += self._idx._size 

4231 return total 

4232 

4233 def close(self) -> None: 

4234 """Close the pack file and index.""" 

4235 if self._data is not None: 

4236 self._data.close() 

4237 self._data = None 

4238 if self._idx is not None: 

4239 self._idx.close() 

4240 self._idx = None 

4241 

4242 def __del__(self) -> None: 

4243 """Ensure pack file is closed when Pack is garbage collected.""" 

4244 if self._data is not None or self._idx is not None: 

4245 import warnings 

4246 

4247 warnings.warn( 

4248 f"unclosed Pack {self!r}", ResourceWarning, stacklevel=2, source=self 

4249 ) 

4250 try: 

4251 self.close() 

4252 except Exception: 

4253 # Ignore errors during cleanup 

4254 pass 

4255 

4256 def __enter__(self) -> Self: 

4257 """Enter context manager.""" 

4258 return self 

4259 

4260 def __exit__( 

4261 self, 

4262 type: type | None, 

4263 value: BaseException | None, 

4264 traceback: TracebackType | None, 

4265 ) -> None: 

4266 """Exit context manager.""" 

4267 self.close() 

4268 

4269 def __eq__(self, other: object) -> bool: 

4270 """Check equality with another pack.""" 

4271 if not isinstance(other, Pack): 

4272 return False 

4273 return self.index == other.index 

4274 

4275 def __len__(self) -> int: 

4276 """Number of entries in this pack.""" 

4277 return len(self.index) 

4278 

4279 def __repr__(self) -> str: 

4280 """Return string representation of this pack.""" 

4281 return f"{self.__class__.__name__}({self._basename!r})" 

4282 

4283 def __iter__(self) -> Iterator[ObjectID]: 

4284 """Iterate over all the sha1s of the objects in this pack.""" 

4285 return iter(self.index) 

4286 

4287 def check_length_and_checksum(self) -> None: 

4288 """Sanity check the length and checksum of the pack index and data.""" 

4289 assert len(self.index) == len(self.data), ( 

4290 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

4291 ) 

4292 idx_stored_checksum = self.index.get_pack_checksum() 

4293 data_stored_checksum = self.data.get_stored_checksum() 

4294 if ( 

4295 idx_stored_checksum is not None 

4296 and idx_stored_checksum != data_stored_checksum 

4297 ): 

4298 raise ChecksumMismatch( 

4299 sha_to_hex(RawObjectID(idx_stored_checksum)), 

4300 sha_to_hex(RawObjectID(data_stored_checksum)), 

4301 ) 

4302 

4303 def check(self) -> None: 

4304 """Check the integrity of this pack. 

4305 

4306 Raises: 

4307 ChecksumMismatch: if a checksum for the index or data is wrong 

4308 """ 

4309 self.index.check() 

4310 self.data.check() 

4311 for obj in self.iterobjects(): 

4312 obj.check() 

4313 # TODO: object connectivity checks 

4314 

4315 def get_stored_checksum(self) -> bytes: 

4316 """Return the stored checksum of the pack data.""" 

4317 return self.data.get_stored_checksum() 

4318 

4319 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

4320 """Return pack tuples for all objects in pack.""" 

4321 return [(o, None) for o in self.iterobjects()] 

4322 

4323 def __contains__(self, sha1: ObjectID | RawObjectID) -> bool: 

4324 """Check whether this pack contains a particular SHA1.""" 

4325 try: 

4326 self.index.object_offset(sha1) 

4327 return True 

4328 except KeyError: 

4329 return False 

4330 

4331 def get_raw(self, sha1: RawObjectID | ObjectID) -> tuple[int, bytes]: 

4332 """Get raw object data by SHA1.""" 

4333 offset = self.index.object_offset(sha1) 

4334 obj_type, obj = self.data.get_object_at(offset) 

4335 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

4336 return type_num, b"".join(chunks) # type: ignore[arg-type] 

4337 

4338 def __getitem__(self, sha1: "ObjectID | RawObjectID") -> ShaFile: 

4339 """Retrieve the specified SHA1.""" 

4340 type, uncomp = self.get_raw(sha1) 

4341 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

4342 

4343 def iterobjects(self) -> Iterator[ShaFile]: 

4344 """Iterate over the objects in this pack.""" 

4345 return iter( 

4346 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

4347 ) 

4348 

4349 def iterobjects_subset( 

4350 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

4351 ) -> Iterator[ShaFile]: 

4352 """Iterate over a subset of objects in this pack.""" 

4353 return ( 

4354 uo 

4355 for uo in PackInflater.for_pack_subset( 

4356 self, 

4357 shas, 

4358 allow_missing=allow_missing, 

4359 resolve_ext_ref=self.resolve_ext_ref, 

4360 ) 

4361 if uo.id in shas 

4362 ) 

4363 

4364 def iter_unpacked_subset( 

4365 self, 

4366 shas: Iterable[ObjectID | RawObjectID], 

4367 *, 

4368 include_comp: bool = False, 

4369 allow_missing: bool = False, 

4370 convert_ofs_delta: bool = False, 

4371 ) -> Iterator[UnpackedObject]: 

4372 """Iterate over unpacked objects in subset.""" 

4373 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

4374 ofs: dict[int, bytes] = {} 

4375 todo: set[ObjectID | RawObjectID] = set(shas) 

4376 for unpacked in self.iter_unpacked(include_comp=include_comp): 

4377 sha = unpacked.sha() 

4378 if unpacked.offset is not None: 

4379 ofs[unpacked.offset] = sha 

4380 hexsha = sha_to_hex(RawObjectID(sha)) 

4381 if hexsha in todo: 

4382 if unpacked.pack_type_num == OFS_DELTA: 

4383 assert isinstance(unpacked.delta_base, int) 

4384 assert unpacked.offset is not None 

4385 base_offset = unpacked.offset - unpacked.delta_base 

4386 try: 

4387 unpacked.delta_base = ofs[base_offset] 

4388 except KeyError: 

4389 ofs_pending[base_offset].append(unpacked) 

4390 continue 

4391 else: 

4392 unpacked.pack_type_num = REF_DELTA 

4393 yield unpacked 

4394 todo.remove(hexsha) 

4395 if unpacked.offset is not None: 

4396 for child in ofs_pending.pop(unpacked.offset, []): 

4397 child.pack_type_num = REF_DELTA 

4398 child.delta_base = sha 

4399 yield child 

4400 assert not ofs_pending 

4401 if not allow_missing and todo: 

4402 raise UnresolvedDeltas(list(todo)) 

4403 

4404 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

4405 """Iterate over all unpacked objects in this pack.""" 

4406 ofs_to_entries = { 

4407 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

4408 } 

4409 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

4410 assert unpacked.offset is not None 

4411 (sha, crc32) = ofs_to_entries[unpacked.offset] 

4412 unpacked._sha = sha 

4413 unpacked.crc32 = crc32 

4414 yield unpacked 

4415 

4416 def keep(self, msg: bytes | None = None) -> str: 

4417 """Add a .keep file for the pack, preventing git from garbage collecting it. 

4418 

4419 Args: 

4420 msg: A message written inside the .keep file; can be used later 

4421 to determine whether or not a .keep file is obsolete. 

4422 Returns: The path of the .keep file, as a string. 

4423 """ 

4424 keepfile_name = f"{self._basename}.keep" 

4425 with GitFile(keepfile_name, "wb") as keepfile: 

4426 if msg: 

4427 keepfile.write(msg) 

4428 keepfile.write(b"\n") 

4429 return keepfile_name 

4430 

4431 def get_ref( 

4432 self, sha: RawObjectID | ObjectID 

4433 ) -> tuple[int | None, int, OldUnpackedObject]: 

4434 """Get the object for a ref SHA, only looking in this pack.""" 

4435 # TODO: cache these results 

4436 try: 

4437 offset = self.index.object_offset(sha) 

4438 except KeyError: 

4439 offset = None 

4440 if offset: 

4441 type, obj = self.data.get_object_at(offset) 

4442 elif self.resolve_ext_ref: 

4443 type, obj = self.resolve_ext_ref(sha) 

4444 else: 

4445 raise KeyError(sha) 

4446 return offset, type, obj 

4447 

4448 def resolve_object( 

4449 self, 

4450 offset: int, 

4451 type: int, 

4452 obj: OldUnpackedObject, 

4453 get_ref: Callable[ 

4454 [RawObjectID | ObjectID], tuple[int | None, int, OldUnpackedObject] 

4455 ] 

4456 | None = None, 

4457 ) -> tuple[int, OldUnpackedObject]: 

4458 """Resolve an object, possibly resolving deltas when necessary. 

4459 

4460 Returns: Tuple with object type and contents. 

4461 """ 

4462 # Walk down the delta chain, building a stack of deltas to reach 

4463 # the requested object. 

4464 base_offset: int | None = offset 

4465 base_type = type 

4466 base_obj = obj 

4467 delta_stack = [] 

4468 while base_type in DELTA_TYPES: 

4469 prev_offset = base_offset 

4470 if get_ref is None: 

4471 get_ref = self.get_ref 

4472 assert isinstance(base_obj, tuple), ( 

4473 f"Expected delta tuple, got {base_obj.__class__.__name__}" 

4474 ) 

4475 if base_type == OFS_DELTA: 

4476 (delta_offset, delta) = base_obj 

4477 # TODO: clean up asserts and replace with nicer error messages 

4478 assert isinstance(delta_offset, int), ( 

4479 f"Expected int, got {delta_offset.__class__}" 

4480 ) 

4481 assert base_offset is not None 

4482 base_offset = base_offset - delta_offset 

4483 base_type, base_obj = self.data.get_object_at(base_offset) 

4484 assert isinstance(base_type, int) 

4485 elif base_type == REF_DELTA: 

4486 (basename, delta) = base_obj 

4487 assert ( 

4488 isinstance(basename, bytes) 

4489 and len(basename) == self.object_format.oid_length 

4490 ) 

4491 base_offset_temp, base_type, base_obj = get_ref(RawObjectID(basename)) 

4492 assert isinstance(base_type, int) 

4493 # base_offset_temp can be None for thin packs (external references) 

4494 base_offset = base_offset_temp 

4495 if base_offset == prev_offset: # object is based on itself 

4496 raise UnresolvedDeltas([basename]) 

4497 delta_stack.append((prev_offset, base_type, delta)) 

4498 

4499 # Now grab the base object (mustn't be a delta) and apply the 

4500 # deltas all the way up the stack. 

4501 chunks = base_obj 

4502 for prev_offset, _delta_type, delta in reversed(delta_stack): 

4503 # Convert chunks to bytes for apply_delta if needed 

4504 if isinstance(chunks, list): 

4505 chunks_bytes = b"".join(chunks) 

4506 elif isinstance(chunks, tuple): 

4507 # For tuple type, second element is the actual data 

4508 _, chunk_data = chunks 

4509 if isinstance(chunk_data, list): 

4510 chunks_bytes = b"".join(chunk_data) 

4511 else: 

4512 chunks_bytes = chunk_data 

4513 else: 

4514 chunks_bytes = chunks 

4515 

4516 # Apply delta and get result as list 

4517 chunks = apply_delta(chunks_bytes, delta) 

4518 

4519 if prev_offset is not None: 

4520 self.data._offset_cache[prev_offset] = base_type, chunks 

4521 return base_type, chunks 

4522 

4523 def entries( 

4524 self, progress: Callable[[int, int], None] | None = None 

4525 ) -> Iterator[PackIndexEntry]: 

4526 """Yield entries summarizing the contents of this pack. 

4527 

4528 Args: 

4529 progress: Progress function, called with current and total 

4530 object count. 

4531 Returns: iterator of tuples with (sha, offset, crc32) 

4532 """ 

4533 return self.data.iterentries( 

4534 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

4535 ) 

4536 

4537 def sorted_entries( 

4538 self, progress: Callable[[int, int], None] | None = None 

4539 ) -> Iterator[PackIndexEntry]: 

4540 """Return entries in this pack, sorted by SHA. 

4541 

4542 Args: 

4543 progress: Progress function, called with current and total 

4544 object count 

4545 Returns: Iterator of tuples with (sha, offset, crc32) 

4546 """ 

4547 return iter( 

4548 self.data.sorted_entries( 

4549 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

4550 ) 

4551 ) 

4552 

4553 def get_unpacked_object( 

4554 self, 

4555 sha: ObjectID | RawObjectID, 

4556 *, 

4557 include_comp: bool = False, 

4558 convert_ofs_delta: bool = True, 

4559 ) -> UnpackedObject: 

4560 """Get the unpacked object for a sha. 

4561 

4562 Args: 

4563 sha: SHA of object to fetch 

4564 include_comp: Whether to include compression data in UnpackedObject 

4565 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

4566 """ 

4567 offset = self.index.object_offset(sha) 

4568 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

4569 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

4570 assert isinstance(unpacked.delta_base, int) 

4571 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

4572 unpacked.pack_type_num = REF_DELTA 

4573 return unpacked 

4574 

4575 

4576def extend_pack( 

4577 f: BinaryIO, 

4578 object_ids: Set["RawObjectID"], 

4579 get_raw: Callable[["RawObjectID | ObjectID"], tuple[int, bytes]], 

4580 object_format: "ObjectFormat", 

4581 *, 

4582 compression_level: int = -1, 

4583 progress: Callable[[bytes], None] | None = None, 

4584) -> tuple[bytes, list[tuple[RawObjectID, int, int]]]: 

4585 """Extend a pack file with more objects. 

4586 

4587 The caller should make sure that object_ids does not contain any objects 

4588 that are already in the pack 

4589 """ 

4590 # Update the header with the new number of objects. 

4591 f.seek(0) 

4592 _version, num_objects = read_pack_header(f.read) 

4593 

4594 if object_ids: 

4595 f.seek(0) 

4596 write_pack_header(f.write, num_objects + len(object_ids)) 

4597 

4598 # Must flush before reading (http://bugs.python.org/issue3207) 

4599 f.flush() 

4600 

4601 # Rescan the rest of the pack, computing the SHA with the new header. 

4602 new_sha = compute_file_sha( 

4603 f, hash_func=object_format.hash_func, end_ofs=-object_format.oid_length 

4604 ) 

4605 

4606 # Must reposition before writing (http://bugs.python.org/issue3207) 

4607 f.seek(0, os.SEEK_CUR) 

4608 

4609 extra_entries = [] 

4610 

4611 # Complete the pack. 

4612 for i, object_id in enumerate(object_ids): 

4613 if progress is not None: 

4614 progress( 

4615 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

4616 ) 

4617 assert len(object_id) == object_format.oid_length 

4618 type_num, data = get_raw(object_id) 

4619 offset = f.tell() 

4620 crc32 = write_pack_object( 

4621 f.write, 

4622 type_num, 

4623 [data], # Convert bytes to list[bytes] 

4624 sha=new_sha, 

4625 compression_level=compression_level, 

4626 object_format=object_format, 

4627 ) 

4628 extra_entries.append((object_id, offset, crc32)) 

4629 pack_sha = new_sha.digest() 

4630 f.write(pack_sha) 

4631 return pack_sha, extra_entries 

4632 

4633 

4634try: 

4635 from dulwich._pack import ( # type: ignore 

4636 apply_delta, 

4637 bisect_find_sha, 

4638 ) 

4639except ImportError: 

4640 pass 

4641 

4642# Try to import the Rust version of create_delta 

4643try: 

4644 from dulwich._pack import create_delta as _create_delta_rs 

4645except ImportError: 

4646 pass 

4647else: 

4648 # Wrap the Rust version to match the Python API (returns bytes instead of Iterator) 

4649 def _create_delta_rs_wrapper(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

4650 """Wrapper for Rust create_delta to match Python API.""" 

4651 yield _create_delta_rs(base_buf, target_buf) 

4652 

4653 create_delta = _create_delta_rs_wrapper