Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1816 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36__all__ = [ 

37 "DEFAULT_PACK_DELTA_WINDOW_SIZE", 

38 "DEFAULT_PACK_INDEX_VERSION", 

39 "DELTA_TYPES", 

40 "OFS_DELTA", 

41 "PACK_SPOOL_FILE_MAX_SIZE", 

42 "REF_DELTA", 

43 "DeltaChainIterator", 

44 "FilePackIndex", 

45 "MemoryPackIndex", 

46 "ObjectContainer", 

47 "Pack", 

48 "PackChunkGenerator", 

49 "PackData", 

50 "PackFileDisappeared", 

51 "PackHint", 

52 "PackIndex", 

53 "PackIndex1", 

54 "PackIndex2", 

55 "PackIndex3", 

56 "PackIndexEntry", 

57 "PackIndexer", 

58 "PackInflater", 

59 "PackStreamCopier", 

60 "PackStreamReader", 

61 "PackedObjectContainer", 

62 "SHA1Reader", 

63 "SHA1Writer", 

64 "UnpackedObject", 

65 "UnpackedObjectIterator", 

66 "UnpackedObjectStream", 

67 "UnresolvedDeltas", 

68 "apply_delta", 

69 "bisect_find_sha", 

70 "chunks_length", 

71 "compute_file_sha", 

72 "deltas_from_sorted_objects", 

73 "deltify_pack_objects", 

74 "extend_pack", 

75 "find_reusable_deltas", 

76 "full_unpacked_object", 

77 "generate_unpacked_objects", 

78 "iter_sha1", 

79 "load_pack_index", 

80 "load_pack_index_file", 

81 "obj_sha", 

82 "pack_header_chunks", 

83 "pack_object_chunks", 

84 "pack_object_header", 

85 "pack_objects_to_data", 

86 "read_pack_header", 

87 "read_zlib_chunks", 

88 "sort_objects_for_delta", 

89 "take_msb_bytes", 

90 "unpack_object", 

91 "verify_and_read", 

92 "write_pack", 

93 "write_pack_data", 

94 "write_pack_from_container", 

95 "write_pack_header", 

96 "write_pack_index", 

97 "write_pack_object", 

98 "write_pack_objects", 

99] 

100 

101import binascii 

102from collections import defaultdict, deque 

103from contextlib import suppress 

104from io import BytesIO, UnsupportedOperation 

105 

106try: 

107 from cdifflib import CSequenceMatcher as SequenceMatcher 

108except ModuleNotFoundError: 

109 from difflib import SequenceMatcher 

110 

111import os 

112import struct 

113import sys 

114import warnings 

115import zlib 

116from collections.abc import Callable, Iterable, Iterator, Sequence, Set 

117from hashlib import sha1, sha256 

118from itertools import chain 

119from os import SEEK_CUR, SEEK_END 

120from struct import unpack_from 

121from types import TracebackType 

122from typing import ( 

123 IO, 

124 TYPE_CHECKING, 

125 Any, 

126 BinaryIO, 

127 Generic, 

128 Protocol, 

129 TypeVar, 

130) 

131 

132try: 

133 import mmap 

134except ImportError: 

135 has_mmap = False 

136else: 

137 has_mmap = True 

138 

139if TYPE_CHECKING: 

140 from _hashlib import HASH as HashObject 

141 

142 from .bitmap import PackBitmap 

143 from .commit_graph import CommitGraph 

144 from .object_store import BaseObjectStore 

145 from .ref import Ref 

146 

147# For some reason the above try, except fails to set has_mmap = False for plan9 

148if sys.platform == "Plan9": 

149 has_mmap = False 

150 

151from .errors import ApplyDeltaError, ChecksumMismatch 

152from .file import GitFile, _GitFile 

153from .lru_cache import LRUSizeCache 

154from .object_format import OBJECT_FORMAT_TYPE_NUMS, SHA1, ObjectFormat 

155from .objects import ( 

156 ObjectID, 

157 RawObjectID, 

158 ShaFile, 

159 hex_to_sha, 

160 object_header, 

161 sha_to_hex, 

162) 

163 

164OFS_DELTA = 6 

165REF_DELTA = 7 

166 

167DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

168 

169 

170DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

171 

172# Keep pack files under 16Mb in memory, otherwise write them out to disk 

173PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

174 

175# Default pack index version to use when none is specified 

176DEFAULT_PACK_INDEX_VERSION = 2 

177 

178 

179OldUnpackedObject = tuple[bytes | int, list[bytes]] | list[bytes] 

180ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

181ProgressFn = Callable[[int, str], None] 

182PackHint = tuple[int, bytes | None] 

183 

184 

185def verify_and_read( 

186 read_func: Callable[[int], bytes], 

187 expected_hash: bytes, 

188 hash_algo: str, 

189 progress: Callable[[bytes], None] | None = None, 

190) -> Iterator[bytes]: 

191 """Read from stream, verify hash, then yield verified chunks. 

192 

193 This function downloads data to a temporary file (in-memory for small files, 

194 on-disk for large ones) while computing its hash. Only after the hash is 

195 verified to match expected_hash will it yield any data. This prevents 

196 corrupted or malicious data from reaching the caller. 

197 

198 Args: 

199 read_func: Function to read bytes (like file.read or HTTP response reader) 

200 expected_hash: Expected hash as hex string bytes (e.g., b'a3b2c1...') 

201 hash_algo: Hash algorithm name ('sha1' or 'sha256') 

202 progress: Optional progress callback 

203 

204 Yields: 

205 Chunks of verified data (only after hash verification succeeds) 

206 

207 Raises: 

208 ValueError: If hash doesn't match or algorithm unsupported 

209 """ 

210 from tempfile import SpooledTemporaryFile 

211 

212 from .object_format import OBJECT_FORMATS 

213 

214 # Get the hash function for this algorithm 

215 obj_format = OBJECT_FORMATS.get(hash_algo) 

216 if obj_format is None: 

217 raise ValueError(f"Unsupported hash algorithm: {hash_algo}") 

218 

219 hasher = obj_format.new_hash() 

220 

221 # Download to temporary file (memory or disk) while computing hash 

222 with SpooledTemporaryFile( 

223 max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix="dulwich-verify-" 

224 ) as temp_file: 

225 # Read data, hash it, and write to temp file 

226 while True: 

227 chunk = read_func(65536) # Read in 64KB chunks 

228 if not chunk: 

229 break 

230 hasher.update(chunk) 

231 temp_file.write(chunk) 

232 

233 # Verify hash BEFORE yielding any data 

234 computed_hash = hasher.hexdigest().encode("ascii") 

235 if computed_hash != expected_hash: 

236 raise ValueError( 

237 f"hash mismatch: expected {expected_hash.decode('ascii')}, " 

238 f"got {computed_hash.decode('ascii')}" 

239 ) 

240 

241 # Hash verified! Now read from temp file and yield chunks 

242 if progress: 

243 progress(b"Hash verified, processing data\n") 

244 

245 temp_file.seek(0) 

246 while True: 

247 chunk = temp_file.read(65536) 

248 if not chunk: 

249 break 

250 yield chunk 

251 

252 

253class UnresolvedDeltas(Exception): 

254 """Delta objects could not be resolved.""" 

255 

256 def __init__(self, shas: list[bytes]) -> None: 

257 """Initialize UnresolvedDeltas exception. 

258 

259 Args: 

260 shas: List of SHA hashes for unresolved delta objects 

261 """ 

262 self.shas = shas 

263 

264 

265class ObjectContainer(Protocol): 

266 """Protocol for objects that can contain git objects.""" 

267 

268 def add_object(self, obj: ShaFile) -> None: 

269 """Add a single object to this object store.""" 

270 

271 def add_objects( 

272 self, 

273 objects: Sequence[tuple[ShaFile, str | None]], 

274 progress: Callable[..., None] | None = None, 

275 ) -> "Pack | None": 

276 """Add a set of objects to this object store. 

277 

278 Args: 

279 objects: Iterable over a list of (object, path) tuples 

280 progress: Progress callback for object insertion 

281 Returns: Optional Pack object of the objects written. 

282 """ 

283 

284 def __contains__(self, sha1: "ObjectID") -> bool: 

285 """Check if a hex sha is present.""" 

286 

287 def __getitem__(self, sha1: "ObjectID | RawObjectID") -> ShaFile: 

288 """Retrieve an object.""" 

289 

290 def get_commit_graph(self) -> "CommitGraph | None": 

291 """Get the commit graph for this object store. 

292 

293 Returns: 

294 CommitGraph object if available, None otherwise 

295 """ 

296 return None 

297 

298 

299class PackedObjectContainer(ObjectContainer): 

300 """Container for objects packed in a pack file.""" 

301 

302 def get_unpacked_object( 

303 self, sha1: "ObjectID | RawObjectID", *, include_comp: bool = False 

304 ) -> "UnpackedObject": 

305 """Get a raw unresolved object. 

306 

307 Args: 

308 sha1: SHA-1 hash of the object 

309 include_comp: Whether to include compressed data 

310 

311 Returns: 

312 UnpackedObject instance 

313 """ 

314 raise NotImplementedError(self.get_unpacked_object) 

315 

316 def iterobjects_subset( 

317 self, shas: Iterable["ObjectID"], *, allow_missing: bool = False 

318 ) -> Iterator[ShaFile]: 

319 """Iterate over a subset of objects. 

320 

321 Args: 

322 shas: Iterable of object SHAs to retrieve 

323 allow_missing: If True, skip missing objects 

324 

325 Returns: 

326 Iterator of ShaFile objects 

327 """ 

328 raise NotImplementedError(self.iterobjects_subset) 

329 

330 def iter_unpacked_subset( 

331 self, 

332 shas: Iterable["ObjectID | RawObjectID"], 

333 *, 

334 include_comp: bool = False, 

335 allow_missing: bool = False, 

336 convert_ofs_delta: bool = True, 

337 ) -> Iterator["UnpackedObject"]: 

338 """Iterate over unpacked objects from a subset of SHAs. 

339 

340 Args: 

341 shas: Set of object SHAs to retrieve 

342 include_comp: Include compressed data if True 

343 allow_missing: If True, skip missing objects 

344 convert_ofs_delta: If True, convert offset deltas to ref deltas 

345 

346 Returns: 

347 Iterator of UnpackedObject instances 

348 """ 

349 raise NotImplementedError(self.iter_unpacked_subset) 

350 

351 

352class UnpackedObjectStream: 

353 """Abstract base class for a stream of unpacked objects.""" 

354 

355 def __iter__(self) -> Iterator["UnpackedObject"]: 

356 """Iterate over unpacked objects.""" 

357 raise NotImplementedError(self.__iter__) 

358 

359 def __len__(self) -> int: 

360 """Return the number of objects in the stream.""" 

361 raise NotImplementedError(self.__len__) 

362 

363 

364def take_msb_bytes( 

365 read: Callable[[int], bytes], crc32: int | None = None 

366) -> tuple[list[int], int | None]: 

367 """Read bytes marked with most significant bit. 

368 

369 Args: 

370 read: Read function 

371 crc32: Optional CRC32 checksum to update 

372 

373 Returns: 

374 Tuple of (list of bytes read, updated CRC32 or None) 

375 """ 

376 ret: list[int] = [] 

377 while len(ret) == 0 or ret[-1] & 0x80: 

378 b = read(1) 

379 if crc32 is not None: 

380 crc32 = binascii.crc32(b, crc32) 

381 ret.append(ord(b[:1])) 

382 return ret, crc32 

383 

384 

385class PackFileDisappeared(Exception): 

386 """Raised when a pack file unexpectedly disappears.""" 

387 

388 def __init__(self, obj: object) -> None: 

389 """Initialize PackFileDisappeared exception. 

390 

391 Args: 

392 obj: The object that triggered the exception 

393 """ 

394 self.obj = obj 

395 

396 

397class UnpackedObject: 

398 """Class encapsulating an object unpacked from a pack file. 

399 

400 These objects should only be created from within unpack_object. Most 

401 members start out as empty and are filled in at various points by 

402 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

403 

404 End users of this object should take care that the function they're getting 

405 this object from is guaranteed to set the members they need. 

406 """ 

407 

408 __slots__ = [ 

409 "_sha", # Cached binary SHA. 

410 "comp_chunks", # Compressed object chunks. 

411 "crc32", # CRC32. 

412 "decomp_chunks", # Decompressed object chunks. 

413 "decomp_len", # Decompressed length of this object. 

414 "delta_base", # Delta base offset or SHA. 

415 "hash_func", # Hash function to use for computing object IDs. 

416 "obj_chunks", # Decompressed and delta-resolved chunks. 

417 "obj_type_num", # Type of this object. 

418 "offset", # Offset in its pack. 

419 "pack_type_num", # Type of this object in the pack (may be a delta). 

420 ] 

421 

422 obj_type_num: int | None 

423 obj_chunks: list[bytes] | None 

424 delta_base: None | bytes | int 

425 decomp_chunks: list[bytes] 

426 comp_chunks: list[bytes] | None 

427 decomp_len: int | None 

428 crc32: int | None 

429 offset: int | None 

430 pack_type_num: int 

431 _sha: bytes | None 

432 hash_func: Callable[[], "HashObject"] 

433 

434 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

435 # methods of this object. 

436 def __init__( 

437 self, 

438 pack_type_num: int, 

439 *, 

440 delta_base: None | bytes | int = None, 

441 decomp_len: int | None = None, 

442 crc32: int | None = None, 

443 sha: bytes | None = None, 

444 decomp_chunks: list[bytes] | None = None, 

445 offset: int | None = None, 

446 hash_func: Callable[[], "HashObject"] = sha1, 

447 ) -> None: 

448 """Initialize an UnpackedObject. 

449 

450 Args: 

451 pack_type_num: Type number of this object in the pack 

452 delta_base: Delta base (offset or SHA) if this is a delta object 

453 decomp_len: Decompressed length of this object 

454 crc32: CRC32 checksum 

455 sha: SHA hash of the object 

456 decomp_chunks: Decompressed chunks 

457 offset: Offset in the pack file 

458 hash_func: Hash function to use (defaults to sha1) 

459 """ 

460 self.offset = offset 

461 self._sha = sha 

462 self.pack_type_num = pack_type_num 

463 self.delta_base = delta_base 

464 self.comp_chunks = None 

465 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

466 if decomp_chunks is not None and decomp_len is None: 

467 self.decomp_len = sum(map(len, decomp_chunks)) 

468 else: 

469 self.decomp_len = decomp_len 

470 self.crc32 = crc32 

471 self.hash_func = hash_func 

472 

473 if pack_type_num in DELTA_TYPES: 

474 self.obj_type_num = None 

475 self.obj_chunks = None 

476 else: 

477 self.obj_type_num = pack_type_num 

478 self.obj_chunks = self.decomp_chunks 

479 self.delta_base = delta_base 

480 

481 def sha(self) -> RawObjectID: 

482 """Return the binary SHA of this object.""" 

483 if self._sha is None: 

484 assert self.obj_type_num is not None and self.obj_chunks is not None 

485 self._sha = obj_sha(self.obj_type_num, self.obj_chunks, self.hash_func) 

486 return RawObjectID(self._sha) 

487 

488 def sha_file(self) -> ShaFile: 

489 """Return a ShaFile from this object.""" 

490 assert self.obj_type_num is not None and self.obj_chunks is not None 

491 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

492 

493 # Only provided for backwards compatibility with code that expects either 

494 # chunks or a delta tuple. 

495 def _obj(self) -> OldUnpackedObject: 

496 """Return the decompressed chunks, or (delta base, delta chunks).""" 

497 if self.pack_type_num in DELTA_TYPES: 

498 assert isinstance(self.delta_base, (bytes, int)) 

499 return (self.delta_base, self.decomp_chunks) 

500 else: 

501 return self.decomp_chunks 

502 

503 def __eq__(self, other: object) -> bool: 

504 """Check equality with another UnpackedObject.""" 

505 if not isinstance(other, UnpackedObject): 

506 return False 

507 for slot in self.__slots__: 

508 if getattr(self, slot) != getattr(other, slot): 

509 return False 

510 return True 

511 

512 def __ne__(self, other: object) -> bool: 

513 """Check inequality with another UnpackedObject.""" 

514 return not (self == other) 

515 

516 def __repr__(self) -> str: 

517 """Return string representation of this UnpackedObject.""" 

518 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

519 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

520 

521 

522_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

523 

524 

525def read_zlib_chunks( 

526 read_some: Callable[[int], bytes], 

527 unpacked: UnpackedObject, 

528 include_comp: bool = False, 

529 buffer_size: int = _ZLIB_BUFSIZE, 

530) -> bytes: 

531 """Read zlib data from a buffer. 

532 

533 This function requires that the buffer have additional data following the 

534 compressed data, which is guaranteed to be the case for git pack files. 

535 

536 Args: 

537 read_some: Read function that returns at least one byte, but may 

538 return less than the requested size. 

539 unpacked: An UnpackedObject to write result data to. If its crc32 

540 attr is not None, the CRC32 of the compressed bytes will be computed 

541 using this starting CRC32. 

542 After this function, will have the following attrs set: 

543 * comp_chunks (if include_comp is True) 

544 * decomp_chunks 

545 * decomp_len 

546 * crc32 

547 include_comp: If True, include compressed data in the result. 

548 buffer_size: Size of the read buffer. 

549 Returns: Leftover unused data from the decompression. 

550 

551 Raises: 

552 zlib.error: if a decompression error occurred. 

553 """ 

554 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

555 raise ValueError("non-negative zlib data stream size expected") 

556 decomp_obj = zlib.decompressobj() 

557 

558 comp_chunks = [] 

559 decomp_chunks = unpacked.decomp_chunks 

560 decomp_len = 0 

561 crc32 = unpacked.crc32 

562 

563 while True: 

564 add = read_some(buffer_size) 

565 if not add: 

566 raise zlib.error("EOF before end of zlib stream") 

567 comp_chunks.append(add) 

568 decomp = decomp_obj.decompress(add) 

569 decomp_len += len(decomp) 

570 decomp_chunks.append(decomp) 

571 unused = decomp_obj.unused_data 

572 if unused: 

573 left = len(unused) 

574 if crc32 is not None: 

575 crc32 = binascii.crc32(add[:-left], crc32) 

576 if include_comp: 

577 comp_chunks[-1] = add[:-left] 

578 break 

579 elif crc32 is not None: 

580 crc32 = binascii.crc32(add, crc32) 

581 if crc32 is not None: 

582 crc32 &= 0xFFFFFFFF 

583 

584 if decomp_len != unpacked.decomp_len: 

585 raise zlib.error("decompressed data does not match expected size") 

586 

587 unpacked.crc32 = crc32 

588 if include_comp: 

589 unpacked.comp_chunks = comp_chunks 

590 return unused 

591 

592 

593def iter_sha1(iter: Iterable[bytes]) -> bytes: 

594 """Return the hexdigest of the SHA1 over a set of names. 

595 

596 Args: 

597 iter: Iterator over string objects 

598 Returns: 40-byte hex sha1 digest 

599 """ 

600 sha = sha1() 

601 for name in iter: 

602 sha.update(name) 

603 return sha.hexdigest().encode("ascii") 

604 

605 

606def load_pack_index( 

607 path: str | os.PathLike[str], object_format: ObjectFormat 

608) -> "PackIndex": 

609 """Load an index file by path. 

610 

611 Args: 

612 path: Path to the index file 

613 object_format: Hash algorithm used by the repository 

614 Returns: A PackIndex loaded from the given path 

615 """ 

616 with GitFile(path, "rb") as f: 

617 return load_pack_index_file(path, f, object_format) 

618 

619 

620def _load_file_contents( 

621 f: IO[bytes] | _GitFile, size: int | None = None 

622) -> tuple[bytes | Any, int]: 

623 """Load contents from a file, preferring mmap when possible. 

624 

625 Args: 

626 f: File-like object to load 

627 size: Expected size, or None to determine from file 

628 Returns: Tuple of (contents, size) 

629 """ 

630 try: 

631 fd = f.fileno() 

632 except (UnsupportedOperation, AttributeError): 

633 fd = None 

634 # Attempt to use mmap if possible 

635 if fd is not None: 

636 if size is None: 

637 size = os.fstat(fd).st_size 

638 if has_mmap: 

639 try: 

640 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

641 except (OSError, ValueError): 

642 # Can't mmap - perhaps a socket or invalid file descriptor 

643 pass 

644 else: 

645 return contents, size 

646 contents_bytes = f.read() 

647 size = len(contents_bytes) 

648 return contents_bytes, size 

649 

650 

651def load_pack_index_file( 

652 path: str | os.PathLike[str], 

653 f: IO[bytes] | _GitFile, 

654 object_format: ObjectFormat, 

655) -> "PackIndex": 

656 """Load an index file from a file-like object. 

657 

658 Args: 

659 path: Path for the index file 

660 f: File-like object 

661 object_format: Hash algorithm used by the repository 

662 Returns: A PackIndex loaded from the given file 

663 """ 

664 contents, size = _load_file_contents(f) 

665 if contents[:4] == b"\377tOc": 

666 version = struct.unpack(b">L", contents[4:8])[0] 

667 if version == 2: 

668 return PackIndex2( 

669 path, 

670 object_format, 

671 file=f, 

672 contents=contents, 

673 size=size, 

674 ) 

675 elif version == 3: 

676 return PackIndex3(path, object_format, file=f, contents=contents, size=size) 

677 else: 

678 raise KeyError(f"Unknown pack index format {version}") 

679 else: 

680 return PackIndex1(path, object_format, file=f, contents=contents, size=size) 

681 

682 

683def bisect_find_sha( 

684 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

685) -> int | None: 

686 """Find a SHA in a data blob with sorted SHAs. 

687 

688 Args: 

689 start: Start index of range to search 

690 end: End index of range to search 

691 sha: Sha to find 

692 unpack_name: Callback to retrieve SHA by index 

693 Returns: Index of the SHA, or None if it wasn't found 

694 """ 

695 assert start <= end 

696 while start <= end: 

697 i = (start + end) // 2 

698 file_sha = unpack_name(i) 

699 if file_sha < sha: 

700 start = i + 1 

701 elif file_sha > sha: 

702 end = i - 1 

703 else: 

704 return i 

705 return None 

706 

707 

708PackIndexEntry = tuple[RawObjectID, int, int | None] 

709 

710 

711class PackIndex: 

712 """An index in to a packfile. 

713 

714 Given a sha id of an object a pack index can tell you the location in the 

715 packfile of that object if it has it. 

716 """ 

717 

718 object_format: "ObjectFormat" 

719 

720 def __eq__(self, other: object) -> bool: 

721 """Check equality with another PackIndex.""" 

722 if not isinstance(other, PackIndex): 

723 return False 

724 

725 for (name1, _, _), (name2, _, _) in zip( 

726 self.iterentries(), other.iterentries() 

727 ): 

728 if name1 != name2: 

729 return False 

730 return True 

731 

732 def __ne__(self, other: object) -> bool: 

733 """Check if this pack index is not equal to another.""" 

734 return not self.__eq__(other) 

735 

736 def __len__(self) -> int: 

737 """Return the number of entries in this pack index.""" 

738 raise NotImplementedError(self.__len__) 

739 

740 def __iter__(self) -> Iterator[ObjectID]: 

741 """Iterate over the SHAs in this pack.""" 

742 return map(lambda sha: sha_to_hex(RawObjectID(sha)), self._itersha()) 

743 

744 def iterentries(self) -> Iterator[PackIndexEntry]: 

745 """Iterate over the entries in this pack index. 

746 

747 Returns: iterator over tuples with object name, offset in packfile and 

748 crc32 checksum. 

749 """ 

750 raise NotImplementedError(self.iterentries) 

751 

752 def get_pack_checksum(self) -> bytes | None: 

753 """Return the SHA1 checksum stored for the corresponding packfile. 

754 

755 Returns: 20-byte binary digest, or None if not available 

756 """ 

757 raise NotImplementedError(self.get_pack_checksum) 

758 

759 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

760 """Return the offset in to the corresponding packfile for the object. 

761 

762 Given the name of an object it will return the offset that object 

763 lives at within the corresponding pack file. If the pack file doesn't 

764 have the object then None will be returned. 

765 """ 

766 raise NotImplementedError(self.object_offset) 

767 

768 def object_sha1(self, index: int) -> bytes: 

769 """Return the SHA1 corresponding to the index in the pack file.""" 

770 for name, offset, _crc32 in self.iterentries(): 

771 if offset == index: 

772 return name 

773 else: 

774 raise KeyError(index) 

775 

776 def _object_offset(self, sha: bytes) -> int: 

777 """See object_offset. 

778 

779 Args: 

780 sha: A *binary* SHA string. (20 characters long)_ 

781 """ 

782 raise NotImplementedError(self._object_offset) 

783 

784 def objects_sha1(self) -> bytes: 

785 """Return the hex SHA1 over all the shas of all objects in this pack. 

786 

787 Note: This is used for the filename of the pack. 

788 """ 

789 return iter_sha1(self._itersha()) 

790 

791 def _itersha(self) -> Iterator[bytes]: 

792 """Yield all the SHA1's of the objects in the index, sorted.""" 

793 raise NotImplementedError(self._itersha) 

794 

795 def iter_prefix(self, prefix: bytes) -> Iterator[RawObjectID]: 

796 """Iterate over all SHA1s with the given prefix. 

797 

798 Args: 

799 prefix: Binary prefix to match 

800 Returns: Iterator of matching SHA1s 

801 """ 

802 # Default implementation for PackIndex classes that don't override 

803 for sha, _, _ in self.iterentries(): 

804 if sha.startswith(prefix): 

805 yield RawObjectID(sha) 

806 

807 def close(self) -> None: 

808 """Close any open files.""" 

809 

810 def check(self) -> None: 

811 """Check the consistency of this pack index.""" 

812 

813 

814class MemoryPackIndex(PackIndex): 

815 """Pack index that is stored entirely in memory.""" 

816 

817 def __init__( 

818 self, 

819 entries: list[PackIndexEntry], 

820 object_format: ObjectFormat, 

821 pack_checksum: bytes | None = None, 

822 ) -> None: 

823 """Create a new MemoryPackIndex. 

824 

825 Args: 

826 entries: Sequence of name, idx, crc32 (sorted) 

827 object_format: Object format used by this index 

828 pack_checksum: Optional pack checksum 

829 """ 

830 self._by_sha = {} 

831 self._by_offset = {} 

832 for name, offset, _crc32 in entries: 

833 self._by_sha[name] = offset 

834 self._by_offset[offset] = name 

835 self._entries = entries 

836 self._pack_checksum = pack_checksum 

837 self.object_format = object_format 

838 

839 def get_pack_checksum(self) -> bytes | None: 

840 """Return the SHA checksum stored for the corresponding packfile.""" 

841 return self._pack_checksum 

842 

843 def __len__(self) -> int: 

844 """Return the number of entries in this pack index.""" 

845 return len(self._entries) 

846 

847 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

848 """Return the offset for the given SHA. 

849 

850 Args: 

851 sha: SHA to look up (binary or hex) 

852 Returns: Offset in the pack file 

853 """ 

854 lookup_sha: RawObjectID 

855 if len(sha) == self.object_format.hex_length: 

856 lookup_sha = hex_to_sha(ObjectID(sha)) 

857 else: 

858 lookup_sha = RawObjectID(sha) 

859 return self._by_sha[lookup_sha] 

860 

861 def object_sha1(self, offset: int) -> bytes: 

862 """Return the SHA1 for the object at the given offset.""" 

863 return self._by_offset[offset] 

864 

865 def _itersha(self) -> Iterator[bytes]: 

866 """Iterate over all SHA1s in the index.""" 

867 return iter(self._by_sha) 

868 

869 def iterentries(self) -> Iterator[PackIndexEntry]: 

870 """Iterate over all index entries.""" 

871 return iter(self._entries) 

872 

873 @classmethod 

874 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

875 """Create a MemoryPackIndex from a PackData object.""" 

876 return MemoryPackIndex( 

877 list(pack_data.sorted_entries()), 

878 pack_checksum=pack_data.get_stored_checksum(), 

879 object_format=pack_data.object_format, 

880 ) 

881 

882 @classmethod 

883 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

884 """Create a copy of another PackIndex in memory.""" 

885 return cls( 

886 list(other_index.iterentries()), 

887 other_index.object_format, 

888 other_index.get_pack_checksum(), 

889 ) 

890 

891 

892class FilePackIndex(PackIndex): 

893 """Pack index that is based on a file. 

894 

895 To do the loop it opens the file, and indexes first 256 4 byte groups 

896 with the first byte of the sha id. The value in the four byte group indexed 

897 is the end of the group that shares the same starting byte. Subtract one 

898 from the starting byte and index again to find the start of the group. 

899 The values are sorted by sha id within the group, so do the math to find 

900 the start and end offset and then bisect in to find if the value is 

901 present. 

902 """ 

903 

904 _fan_out_table: list[int] 

905 _file: IO[bytes] | _GitFile 

906 

907 def __init__( 

908 self, 

909 filename: str | os.PathLike[str], 

910 file: IO[bytes] | _GitFile | None = None, 

911 contents: "bytes | mmap.mmap | None" = None, 

912 size: int | None = None, 

913 ) -> None: 

914 """Create a pack index object. 

915 

916 Provide it with the name of the index file to consider, and it will map 

917 it whenever required. 

918 """ 

919 self._filename = filename 

920 # Take the size now, so it can be checked each time we map the file to 

921 # ensure that it hasn't changed. 

922 if file is None: 

923 self._file = GitFile(filename, "rb") 

924 else: 

925 self._file = file 

926 if contents is None: 

927 self._contents, self._size = _load_file_contents(self._file, size) 

928 else: 

929 self._contents = contents 

930 self._size = size if size is not None else len(contents) 

931 

932 @property 

933 def path(self) -> str: 

934 """Return the path to this index file.""" 

935 return os.fspath(self._filename) 

936 

937 def __eq__(self, other: object) -> bool: 

938 """Check equality with another FilePackIndex.""" 

939 # Quick optimization: 

940 if ( 

941 isinstance(other, FilePackIndex) 

942 and self._fan_out_table != other._fan_out_table 

943 ): 

944 return False 

945 

946 return super().__eq__(other) 

947 

948 def close(self) -> None: 

949 """Close the underlying file and any mmap.""" 

950 self._file.close() 

951 close_fn = getattr(self._contents, "close", None) 

952 if close_fn is not None: 

953 close_fn() 

954 

955 def __len__(self) -> int: 

956 """Return the number of entries in this pack index.""" 

957 return self._fan_out_table[-1] 

958 

959 def _unpack_entry(self, i: int) -> PackIndexEntry: 

960 """Unpack the i-th entry in the index file. 

961 

962 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

963 checksum (if known). 

964 """ 

965 raise NotImplementedError(self._unpack_entry) 

966 

967 def _unpack_name(self, i: int) -> bytes: 

968 """Unpack the i-th name from the index file.""" 

969 raise NotImplementedError(self._unpack_name) 

970 

971 def _unpack_offset(self, i: int) -> int: 

972 """Unpack the i-th object offset from the index file.""" 

973 raise NotImplementedError(self._unpack_offset) 

974 

975 def _unpack_crc32_checksum(self, i: int) -> int | None: 

976 """Unpack the crc32 checksum for the ith object from the index file.""" 

977 raise NotImplementedError(self._unpack_crc32_checksum) 

978 

979 def _itersha(self) -> Iterator[bytes]: 

980 """Iterate over all SHA1s in the index.""" 

981 for i in range(len(self)): 

982 yield self._unpack_name(i) 

983 

984 def iterentries(self) -> Iterator[PackIndexEntry]: 

985 """Iterate over the entries in this pack index. 

986 

987 Returns: iterator over tuples with object name, offset in packfile and 

988 crc32 checksum. 

989 """ 

990 for i in range(len(self)): 

991 yield self._unpack_entry(i) 

992 

993 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

994 """Read the fan-out table from the index. 

995 

996 The fan-out table contains 256 entries mapping first byte values 

997 to the number of objects with SHA1s less than or equal to that byte. 

998 

999 Args: 

1000 start_offset: Offset in the file where the fan-out table starts 

1001 Returns: List of 256 integers 

1002 """ 

1003 ret = [] 

1004 for i in range(0x100): 

1005 fanout_entry = self._contents[ 

1006 start_offset + i * 4 : start_offset + (i + 1) * 4 

1007 ] 

1008 ret.append(struct.unpack(">L", fanout_entry)[0]) 

1009 return ret 

1010 

1011 def check(self) -> None: 

1012 """Check that the stored checksum matches the actual checksum.""" 

1013 actual = self.calculate_checksum() 

1014 stored = self.get_stored_checksum() 

1015 if actual != stored: 

1016 raise ChecksumMismatch(stored, actual) 

1017 

1018 def calculate_checksum(self) -> bytes: 

1019 """Calculate the SHA1 checksum over this pack index. 

1020 

1021 Returns: This is a 20-byte binary digest 

1022 """ 

1023 return sha1(self._contents[:-20]).digest() 

1024 

1025 def get_pack_checksum(self) -> bytes: 

1026 """Return the SHA1 checksum stored for the corresponding packfile. 

1027 

1028 Returns: 20-byte binary digest 

1029 """ 

1030 return bytes(self._contents[-40:-20]) 

1031 

1032 def get_stored_checksum(self) -> bytes: 

1033 """Return the SHA1 checksum stored for this index. 

1034 

1035 Returns: 20-byte binary digest 

1036 """ 

1037 return bytes(self._contents[-20:]) 

1038 

1039 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

1040 """Return the offset in to the corresponding packfile for the object. 

1041 

1042 Given the name of an object it will return the offset that object 

1043 lives at within the corresponding pack file. If the pack file doesn't 

1044 have the object then None will be returned. 

1045 """ 

1046 lookup_sha: RawObjectID 

1047 if len(sha) == self.object_format.hex_length: # hex string 

1048 lookup_sha = hex_to_sha(ObjectID(sha)) 

1049 else: 

1050 lookup_sha = RawObjectID(sha) 

1051 try: 

1052 return self._object_offset(lookup_sha) 

1053 except ValueError as exc: 

1054 closed = getattr(self._contents, "closed", None) 

1055 if closed in (None, True): 

1056 raise PackFileDisappeared(self) from exc 

1057 raise 

1058 

1059 def _object_offset(self, sha: bytes) -> int: 

1060 """See object_offset. 

1061 

1062 Args: 

1063 sha: A *binary* SHA string. (20 characters long)_ 

1064 """ 

1065 hash_size = getattr(self, "hash_size", 20) # Default to SHA1 for v1 

1066 assert len(sha) == hash_size 

1067 idx = ord(sha[:1]) 

1068 if idx == 0: 

1069 start = 0 

1070 else: 

1071 start = self._fan_out_table[idx - 1] 

1072 end = self._fan_out_table[idx] 

1073 i = bisect_find_sha(start, end, sha, self._unpack_name) 

1074 if i is None: 

1075 raise KeyError(sha) 

1076 return self._unpack_offset(i) 

1077 

1078 def iter_prefix(self, prefix: bytes) -> Iterator[RawObjectID]: 

1079 """Iterate over all SHA1s with the given prefix.""" 

1080 start = ord(prefix[:1]) 

1081 if start == 0: 

1082 start = 0 

1083 else: 

1084 start = self._fan_out_table[start - 1] 

1085 end = ord(prefix[:1]) + 1 

1086 if end == 0x100: 

1087 end = len(self) 

1088 else: 

1089 end = self._fan_out_table[end] 

1090 assert start <= end 

1091 started = False 

1092 for i in range(start, end): 

1093 name: bytes = self._unpack_name(i) 

1094 if name.startswith(prefix): 

1095 yield RawObjectID(name) 

1096 started = True 

1097 elif started: 

1098 break 

1099 

1100 

1101class PackIndex1(FilePackIndex): 

1102 """Version 1 Pack Index file.""" 

1103 

1104 object_format = SHA1 

1105 

1106 def __init__( 

1107 self, 

1108 filename: str | os.PathLike[str], 

1109 object_format: ObjectFormat, 

1110 file: IO[bytes] | _GitFile | None = None, 

1111 contents: bytes | None = None, 

1112 size: int | None = None, 

1113 ) -> None: 

1114 """Initialize a version 1 pack index. 

1115 

1116 Args: 

1117 filename: Path to the index file 

1118 object_format: Object format used by the repository 

1119 file: Optional file object 

1120 contents: Optional mmap'd contents 

1121 size: Optional size of the index 

1122 """ 

1123 super().__init__(filename, file, contents, size) 

1124 

1125 # PackIndex1 only supports SHA1 

1126 if object_format != SHA1: 

1127 raise AssertionError( 

1128 f"PackIndex1 only supports SHA1, not {object_format.name}" 

1129 ) 

1130 

1131 self.object_format = object_format 

1132 self.version = 1 

1133 self._fan_out_table = self._read_fan_out_table(0) 

1134 self.hash_size = self.object_format.oid_length 

1135 self._entry_size = 4 + self.hash_size 

1136 

1137 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, None]: 

1138 base_offset = (0x100 * 4) + (i * self._entry_size) 

1139 offset = unpack_from(">L", self._contents, base_offset)[0] 

1140 name = self._contents[base_offset + 4 : base_offset + 4 + self.hash_size] 

1141 return (RawObjectID(name), offset, None) 

1142 

1143 def _unpack_name(self, i: int) -> bytes: 

1144 offset = (0x100 * 4) + (i * self._entry_size) + 4 

1145 return self._contents[offset : offset + self.hash_size] 

1146 

1147 def _unpack_offset(self, i: int) -> int: 

1148 offset = (0x100 * 4) + (i * self._entry_size) 

1149 return int(unpack_from(">L", self._contents, offset)[0]) 

1150 

1151 def _unpack_crc32_checksum(self, i: int) -> None: 

1152 # Not stored in v1 index files 

1153 return None 

1154 

1155 

1156class PackIndex2(FilePackIndex): 

1157 """Version 2 Pack Index file.""" 

1158 

1159 object_format = SHA1 

1160 

1161 def __init__( 

1162 self, 

1163 filename: str | os.PathLike[str], 

1164 object_format: ObjectFormat, 

1165 file: IO[bytes] | _GitFile | None = None, 

1166 contents: bytes | None = None, 

1167 size: int | None = None, 

1168 ) -> None: 

1169 """Initialize a version 2 pack index. 

1170 

1171 Args: 

1172 filename: Path to the index file 

1173 object_format: Object format used by the repository 

1174 file: Optional file object 

1175 contents: Optional mmap'd contents 

1176 size: Optional size of the index 

1177 """ 

1178 super().__init__(filename, file, contents, size) 

1179 self.object_format = object_format 

1180 if self._contents[:4] != b"\377tOc": 

1181 raise AssertionError("Not a v2 pack index file") 

1182 (self.version,) = unpack_from(b">L", self._contents, 4) 

1183 if self.version != 2: 

1184 raise AssertionError(f"Version was {self.version}") 

1185 self._fan_out_table = self._read_fan_out_table(8) 

1186 self.hash_size = self.object_format.oid_length 

1187 self._name_table_offset = 8 + 0x100 * 4 

1188 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1189 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1190 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1191 self 

1192 ) 

1193 

1194 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, int]: 

1195 return ( 

1196 RawObjectID(self._unpack_name(i)), 

1197 self._unpack_offset(i), 

1198 self._unpack_crc32_checksum(i), 

1199 ) 

1200 

1201 def _unpack_name(self, i: int) -> bytes: 

1202 offset = self._name_table_offset + i * self.hash_size 

1203 return self._contents[offset : offset + self.hash_size] 

1204 

1205 def _unpack_offset(self, i: int) -> int: 

1206 offset = self._pack_offset_table_offset + i * 4 

1207 offset_val = int(unpack_from(">L", self._contents, offset)[0]) 

1208 if offset_val & (2**31): 

1209 offset = ( 

1210 self._pack_offset_largetable_offset + (offset_val & (2**31 - 1)) * 8 

1211 ) 

1212 offset_val = int(unpack_from(">Q", self._contents, offset)[0]) 

1213 return offset_val 

1214 

1215 def _unpack_crc32_checksum(self, i: int) -> int: 

1216 return int( 

1217 unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1218 ) 

1219 

1220 def get_pack_checksum(self) -> bytes: 

1221 """Return the checksum stored for the corresponding packfile. 

1222 

1223 Returns: binary digest (size depends on hash algorithm) 

1224 """ 

1225 # Index ends with: pack_checksum + index_checksum 

1226 # Each checksum is hash_size bytes 

1227 checksum_size = self.hash_size 

1228 return bytes(self._contents[-2 * checksum_size : -checksum_size]) 

1229 

1230 def get_stored_checksum(self) -> bytes: 

1231 """Return the checksum stored for this index. 

1232 

1233 Returns: binary digest (size depends on hash algorithm) 

1234 """ 

1235 checksum_size = self.hash_size 

1236 return bytes(self._contents[-checksum_size:]) 

1237 

1238 def calculate_checksum(self) -> bytes: 

1239 """Calculate the checksum over this pack index. 

1240 

1241 Returns: binary digest (size depends on hash algorithm) 

1242 """ 

1243 # Determine hash function based on hash_size 

1244 if self.hash_size == 20: 

1245 hash_func = sha1 

1246 elif self.hash_size == 32: 

1247 hash_func = sha256 

1248 else: 

1249 raise ValueError(f"Unsupported hash size: {self.hash_size}") 

1250 

1251 return hash_func(self._contents[: -self.hash_size]).digest() 

1252 

1253 

1254class PackIndex3(FilePackIndex): 

1255 """Version 3 Pack Index file. 

1256 

1257 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1258 """ 

1259 

1260 def __init__( 

1261 self, 

1262 filename: str | os.PathLike[str], 

1263 object_format: ObjectFormat, 

1264 file: IO[bytes] | _GitFile | None = None, 

1265 contents: bytes | None = None, 

1266 size: int | None = None, 

1267 ) -> None: 

1268 """Initialize a version 3 pack index. 

1269 

1270 Args: 

1271 filename: Path to the index file 

1272 object_format: Object format used by the repository 

1273 file: Optional file object 

1274 contents: Optional mmap'd contents 

1275 size: Optional size of the index 

1276 """ 

1277 super().__init__(filename, file, contents, size) 

1278 if self._contents[:4] != b"\377tOc": 

1279 raise AssertionError("Not a v3 pack index file") 

1280 (self.version,) = unpack_from(b">L", self._contents, 4) 

1281 if self.version != 3: 

1282 raise AssertionError(f"Version was {self.version}") 

1283 

1284 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1285 (self.hash_format,) = unpack_from(b">L", self._contents, 8) 

1286 file_object_format = OBJECT_FORMAT_TYPE_NUMS[self.hash_format] 

1287 

1288 # Verify provided object_format matches what's in the file 

1289 if object_format != file_object_format: 

1290 raise AssertionError( 

1291 f"Object format mismatch: provided {object_format.name}, " 

1292 f"but file contains {file_object_format.name}" 

1293 ) 

1294 

1295 self.object_format = object_format 

1296 self.hash_size = self.object_format.oid_length 

1297 

1298 # Read length of shortened object names 

1299 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1300 

1301 # Calculate offsets based on variable hash size 

1302 self._fan_out_table = self._read_fan_out_table( 

1303 16 

1304 ) # After header (4 + 4 + 4 + 4) 

1305 self._name_table_offset = 16 + 0x100 * 4 

1306 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1307 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1308 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1309 self 

1310 ) 

1311 

1312 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, int]: 

1313 return ( 

1314 RawObjectID(self._unpack_name(i)), 

1315 self._unpack_offset(i), 

1316 self._unpack_crc32_checksum(i), 

1317 ) 

1318 

1319 def _unpack_name(self, i: int) -> bytes: 

1320 offset = self._name_table_offset + i * self.hash_size 

1321 return self._contents[offset : offset + self.hash_size] 

1322 

1323 def _unpack_offset(self, i: int) -> int: 

1324 offset_pos = self._pack_offset_table_offset + i * 4 

1325 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1326 assert isinstance(offset, int) 

1327 if offset & (2**31): 

1328 large_offset_pos = ( 

1329 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1330 ) 

1331 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1332 assert isinstance(offset, int) 

1333 return offset 

1334 

1335 def _unpack_crc32_checksum(self, i: int) -> int: 

1336 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1337 assert isinstance(result, int) 

1338 return result 

1339 

1340 

1341def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1342 """Read the header of a pack file. 

1343 

1344 Args: 

1345 read: Read function 

1346 Returns: Tuple of (pack version, number of objects). If no data is 

1347 available to read, returns (None, None). 

1348 """ 

1349 header = read(12) 

1350 if not header: 

1351 raise AssertionError("file too short to contain pack") 

1352 if header[:4] != b"PACK": 

1353 raise AssertionError(f"Invalid pack header {header!r}") 

1354 (version,) = unpack_from(b">L", header, 4) 

1355 if version not in (2, 3): 

1356 raise AssertionError(f"Version was {version}") 

1357 (num_objects,) = unpack_from(b">L", header, 8) 

1358 return (version, num_objects) 

1359 

1360 

1361def chunks_length(chunks: bytes | Iterable[bytes]) -> int: 

1362 """Get the total length of a sequence of chunks. 

1363 

1364 Args: 

1365 chunks: Either a single bytes object or an iterable of bytes 

1366 Returns: Total length in bytes 

1367 """ 

1368 if isinstance(chunks, bytes): 

1369 return len(chunks) 

1370 else: 

1371 return sum(map(len, chunks)) 

1372 

1373 

1374def unpack_object( 

1375 read_all: Callable[[int], bytes], 

1376 hash_func: Callable[[], "HashObject"], 

1377 read_some: Callable[[int], bytes] | None = None, 

1378 compute_crc32: bool = False, 

1379 include_comp: bool = False, 

1380 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1381) -> tuple[UnpackedObject, bytes]: 

1382 """Unpack a Git object. 

1383 

1384 Args: 

1385 read_all: Read function that blocks until the number of requested 

1386 bytes are read. 

1387 hash_func: Hash function to use for computing object IDs. 

1388 read_some: Read function that returns at least one byte, but may not 

1389 return the number of bytes requested. 

1390 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1391 False, the returned CRC32 will be None. 

1392 include_comp: If True, include compressed data in the result. 

1393 zlib_bufsize: An optional buffer size for zlib operations. 

1394 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1395 leftover from decompression, and unpacked in an UnpackedObject with 

1396 the following attrs set: 

1397 

1398 * obj_chunks (for non-delta types) 

1399 * pack_type_num 

1400 * delta_base (for delta types) 

1401 * comp_chunks (if include_comp is True) 

1402 * decomp_chunks 

1403 * decomp_len 

1404 * crc32 (if compute_crc32 is True) 

1405 """ 

1406 if read_some is None: 

1407 read_some = read_all 

1408 if compute_crc32: 

1409 crc32 = 0 

1410 else: 

1411 crc32 = None 

1412 

1413 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1414 type_num = (raw[0] >> 4) & 0x07 

1415 size = raw[0] & 0x0F 

1416 for i, byte in enumerate(raw[1:]): 

1417 size += (byte & 0x7F) << ((i * 7) + 4) 

1418 

1419 delta_base: int | bytes | None 

1420 raw_base = len(raw) 

1421 if type_num == OFS_DELTA: 

1422 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1423 raw_base += len(raw) 

1424 if raw[-1] & 0x80: 

1425 raise AssertionError 

1426 delta_base_offset = raw[0] & 0x7F 

1427 for byte in raw[1:]: 

1428 delta_base_offset += 1 

1429 delta_base_offset <<= 7 

1430 delta_base_offset += byte & 0x7F 

1431 delta_base = delta_base_offset 

1432 elif type_num == REF_DELTA: 

1433 # Determine hash size from hash_func 

1434 hash_size = len(hash_func().digest()) 

1435 delta_base_obj = read_all(hash_size) 

1436 if crc32 is not None: 

1437 crc32 = binascii.crc32(delta_base_obj, crc32) 

1438 delta_base = delta_base_obj 

1439 raw_base += hash_size 

1440 else: 

1441 delta_base = None 

1442 

1443 unpacked = UnpackedObject( 

1444 type_num, 

1445 delta_base=delta_base, 

1446 decomp_len=size, 

1447 crc32=crc32, 

1448 hash_func=hash_func, 

1449 ) 

1450 unused = read_zlib_chunks( 

1451 read_some, 

1452 unpacked, 

1453 buffer_size=zlib_bufsize, 

1454 include_comp=include_comp, 

1455 ) 

1456 return unpacked, unused 

1457 

1458 

1459def _compute_object_size(value: tuple[int, Any]) -> int: 

1460 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1461 (num, obj) = value 

1462 if num in DELTA_TYPES: 

1463 return chunks_length(obj[1]) 

1464 return chunks_length(obj) 

1465 

1466 

1467class PackStreamReader: 

1468 """Class to read a pack stream. 

1469 

1470 The pack is read from a ReceivableProtocol using read() or recv() as 

1471 appropriate. 

1472 """ 

1473 

1474 def __init__( 

1475 self, 

1476 hash_func: Callable[[], "HashObject"], 

1477 read_all: Callable[[int], bytes], 

1478 read_some: Callable[[int], bytes] | None = None, 

1479 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1480 ) -> None: 

1481 """Initialize pack stream reader. 

1482 

1483 Args: 

1484 hash_func: Hash function to use for computing object IDs 

1485 read_all: Function to read all requested bytes 

1486 read_some: Function to read some bytes (optional) 

1487 zlib_bufsize: Buffer size for zlib decompression 

1488 """ 

1489 self.read_all = read_all 

1490 if read_some is None: 

1491 self.read_some = read_all 

1492 else: 

1493 self.read_some = read_some 

1494 self.hash_func = hash_func 

1495 self.sha = hash_func() 

1496 self._hash_size = len(hash_func().digest()) 

1497 self._offset = 0 

1498 self._rbuf = BytesIO() 

1499 # trailer is a deque to avoid memory allocation on small reads 

1500 self._trailer: deque[int] = deque() 

1501 self._zlib_bufsize = zlib_bufsize 

1502 

1503 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1504 """Read up to size bytes using the given callback. 

1505 

1506 As a side effect, update the verifier's hash (excluding the last 

1507 hash_size bytes read, which is the pack checksum). 

1508 

1509 Args: 

1510 read: The read callback to read from. 

1511 size: The maximum number of bytes to read; the particular 

1512 behavior is callback-specific. 

1513 Returns: Bytes read 

1514 """ 

1515 data = read(size) 

1516 

1517 # maintain a trailer of the last hash_size bytes we've read 

1518 n = len(data) 

1519 self._offset += n 

1520 tn = len(self._trailer) 

1521 if n >= self._hash_size: 

1522 to_pop = tn 

1523 to_add = self._hash_size 

1524 else: 

1525 to_pop = max(n + tn - self._hash_size, 0) 

1526 to_add = n 

1527 self.sha.update( 

1528 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1529 ) 

1530 self._trailer.extend(data[-to_add:]) 

1531 

1532 # hash everything but the trailer 

1533 self.sha.update(data[:-to_add]) 

1534 return data 

1535 

1536 def _buf_len(self) -> int: 

1537 buf = self._rbuf 

1538 start = buf.tell() 

1539 buf.seek(0, SEEK_END) 

1540 end = buf.tell() 

1541 buf.seek(start) 

1542 return end - start 

1543 

1544 @property 

1545 def offset(self) -> int: 

1546 """Return current offset in the stream.""" 

1547 return self._offset - self._buf_len() 

1548 

1549 def read(self, size: int) -> bytes: 

1550 """Read, blocking until size bytes are read.""" 

1551 buf_len = self._buf_len() 

1552 if buf_len >= size: 

1553 return self._rbuf.read(size) 

1554 buf_data = self._rbuf.read() 

1555 self._rbuf = BytesIO() 

1556 return buf_data + self._read(self.read_all, size - buf_len) 

1557 

1558 def recv(self, size: int) -> bytes: 

1559 """Read up to size bytes, blocking until one byte is read.""" 

1560 buf_len = self._buf_len() 

1561 if buf_len: 

1562 data = self._rbuf.read(size) 

1563 if size >= buf_len: 

1564 self._rbuf = BytesIO() 

1565 return data 

1566 return self._read(self.read_some, size) 

1567 

1568 def __len__(self) -> int: 

1569 """Return the number of objects in this pack.""" 

1570 return self._num_objects 

1571 

1572 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1573 """Read the objects in this pack file. 

1574 

1575 Args: 

1576 compute_crc32: If True, compute the CRC32 of the compressed 

1577 data. If False, the returned CRC32 will be None. 

1578 Returns: Iterator over UnpackedObjects with the following members set: 

1579 offset 

1580 obj_type_num 

1581 obj_chunks (for non-delta types) 

1582 delta_base (for delta types) 

1583 decomp_chunks 

1584 decomp_len 

1585 crc32 (if compute_crc32 is True) 

1586 

1587 Raises: 

1588 ChecksumMismatch: if the checksum of the pack contents does not 

1589 match the checksum in the pack trailer. 

1590 zlib.error: if an error occurred during zlib decompression. 

1591 IOError: if an error occurred writing to the output file. 

1592 """ 

1593 _pack_version, self._num_objects = read_pack_header(self.read) 

1594 

1595 for _ in range(self._num_objects): 

1596 offset = self.offset 

1597 unpacked, unused = unpack_object( 

1598 self.read, 

1599 self.hash_func, 

1600 read_some=self.recv, 

1601 compute_crc32=compute_crc32, 

1602 zlib_bufsize=self._zlib_bufsize, 

1603 ) 

1604 unpacked.offset = offset 

1605 

1606 # prepend any unused data to current read buffer 

1607 buf = BytesIO() 

1608 buf.write(unused) 

1609 buf.write(self._rbuf.read()) 

1610 buf.seek(0) 

1611 self._rbuf = buf 

1612 

1613 yield unpacked 

1614 

1615 if self._buf_len() < self._hash_size: 

1616 # If the read buffer is full, then the last read() got the whole 

1617 # trailer off the wire. If not, it means there is still some of the 

1618 # trailer to read. We need to read() all hash_size bytes; N come from the 

1619 # read buffer and (hash_size - N) come from the wire. 

1620 self.read(self._hash_size) 

1621 

1622 pack_sha = bytearray(self._trailer) 

1623 if pack_sha != self.sha.digest(): 

1624 raise ChecksumMismatch( 

1625 sha_to_hex(RawObjectID(bytes(pack_sha))), self.sha.hexdigest() 

1626 ) 

1627 

1628 

1629class PackStreamCopier(PackStreamReader): 

1630 """Class to verify a pack stream as it is being read. 

1631 

1632 The pack is read from a ReceivableProtocol using read() or recv() as 

1633 appropriate and written out to the given file-like object. 

1634 """ 

1635 

1636 def __init__( 

1637 self, 

1638 hash_func: Callable[[], "HashObject"], 

1639 read_all: Callable[[int], bytes], 

1640 read_some: Callable[[int], bytes] | None, 

1641 outfile: IO[bytes], 

1642 delta_iter: "DeltaChainIterator[UnpackedObject] | None" = None, 

1643 ) -> None: 

1644 """Initialize the copier. 

1645 

1646 Args: 

1647 hash_func: Hash function to use for computing object IDs 

1648 read_all: Read function that blocks until the number of 

1649 requested bytes are read. 

1650 read_some: Read function that returns at least one byte, but may 

1651 not return the number of bytes requested. 

1652 outfile: File-like object to write output through. 

1653 delta_iter: Optional DeltaChainIterator to record deltas as we 

1654 read them. 

1655 """ 

1656 super().__init__(hash_func, read_all, read_some=read_some) 

1657 self.outfile = outfile 

1658 self._delta_iter = delta_iter 

1659 

1660 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1661 """Read data from the read callback and write it to the file.""" 

1662 data = super()._read(read, size) 

1663 self.outfile.write(data) 

1664 return data 

1665 

1666 def verify(self, progress: Callable[..., None] | None = None) -> None: 

1667 """Verify a pack stream and write it to the output file. 

1668 

1669 See PackStreamReader.iterobjects for a list of exceptions this may 

1670 throw. 

1671 """ 

1672 i = 0 # default count of entries if read_objects() is empty 

1673 for i, unpacked in enumerate(self.read_objects()): 

1674 if self._delta_iter: 

1675 self._delta_iter.record(unpacked) 

1676 if progress is not None: 

1677 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1678 if progress is not None: 

1679 progress(f"copied {i} pack entries\n".encode("ascii")) 

1680 

1681 

1682def obj_sha( 

1683 type: int, 

1684 chunks: bytes | Iterable[bytes], 

1685 hash_func: Callable[[], "HashObject"] = sha1, 

1686) -> bytes: 

1687 """Compute the SHA for a numeric type and object chunks. 

1688 

1689 Args: 

1690 type: Object type number 

1691 chunks: Object data chunks 

1692 hash_func: Hash function to use (defaults to sha1) 

1693 

1694 Returns: 

1695 Binary hash digest 

1696 """ 

1697 sha = hash_func() 

1698 sha.update(object_header(type, chunks_length(chunks))) 

1699 if isinstance(chunks, bytes): 

1700 sha.update(chunks) 

1701 else: 

1702 for chunk in chunks: 

1703 sha.update(chunk) 

1704 return sha.digest() 

1705 

1706 

1707def compute_file_sha( 

1708 f: IO[bytes], 

1709 hash_func: Callable[[], "HashObject"], 

1710 start_ofs: int = 0, 

1711 end_ofs: int = 0, 

1712 buffer_size: int = 1 << 16, 

1713) -> "HashObject": 

1714 """Hash a portion of a file into a new SHA. 

1715 

1716 Args: 

1717 f: A file-like object to read from that supports seek(). 

1718 hash_func: A callable that returns a new HashObject. 

1719 start_ofs: The offset in the file to start reading at. 

1720 end_ofs: The offset in the file to end reading at, relative to the 

1721 end of the file. 

1722 buffer_size: A buffer size for reading. 

1723 Returns: A new SHA object updated with data read from the file. 

1724 """ 

1725 sha = hash_func() 

1726 f.seek(0, SEEK_END) 

1727 length = f.tell() 

1728 if start_ofs < 0: 

1729 raise AssertionError(f"start_ofs cannot be negative: {start_ofs}") 

1730 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1731 raise AssertionError( 

1732 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1733 ) 

1734 todo = length + end_ofs - start_ofs 

1735 f.seek(start_ofs) 

1736 while todo: 

1737 data = f.read(min(todo, buffer_size)) 

1738 sha.update(data) 

1739 todo -= len(data) 

1740 return sha 

1741 

1742 

1743class PackData: 

1744 """The data contained in a packfile. 

1745 

1746 Pack files can be accessed both sequentially for exploding a pack, and 

1747 directly with the help of an index to retrieve a specific object. 

1748 

1749 The objects within are either complete or a delta against another. 

1750 

1751 The header is variable length. If the MSB of each byte is set then it 

1752 indicates that the subsequent byte is still part of the header. 

1753 For the first byte the next MS bits are the type, which tells you the type 

1754 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1755 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1756 size, i.e. the last byte of the header contains the MS bits of the size. 

1757 

1758 For the complete objects the data is stored as zlib deflated data. 

1759 The size in the header is the uncompressed object size, so to uncompress 

1760 you need to just keep feeding data to zlib until you get an object back, 

1761 or it errors on bad data. This is done here by just giving the complete 

1762 buffer from the start of the deflated object on. This is bad, but until I 

1763 get mmap sorted out it will have to do. 

1764 

1765 Currently there are no integrity checks done. Also no attempt is made to 

1766 try and detect the delta case, or a request for an object at the wrong 

1767 position. It will all just throw a zlib or KeyError. 

1768 """ 

1769 

1770 def __init__( 

1771 self, 

1772 filename: str | os.PathLike[str], 

1773 object_format: ObjectFormat, 

1774 file: IO[bytes] | None = None, 

1775 size: int | None = None, 

1776 *, 

1777 delta_window_size: int | None = None, 

1778 window_memory: int | None = None, 

1779 delta_cache_size: int | None = None, 

1780 depth: int | None = None, 

1781 threads: int | None = None, 

1782 big_file_threshold: int | None = None, 

1783 ) -> None: 

1784 """Create a PackData object representing the pack in the given filename. 

1785 

1786 The file must exist and stay readable until the object is disposed of. 

1787 It must also stay the same size. It will be mapped whenever needed. 

1788 

1789 Currently there is a restriction on the size of the pack as the python 

1790 mmap implementation is flawed. 

1791 """ 

1792 self._filename = filename 

1793 self.object_format = object_format 

1794 self._size = size 

1795 self._header_size = 12 

1796 self.delta_window_size = delta_window_size 

1797 self.window_memory = window_memory 

1798 self.delta_cache_size = delta_cache_size 

1799 self.depth = depth 

1800 self.threads = threads 

1801 self.big_file_threshold = big_file_threshold 

1802 self._file: IO[bytes] 

1803 

1804 if file is None: 

1805 self._file = GitFile(self._filename, "rb") 

1806 else: 

1807 self._file = file 

1808 (_version, self._num_objects) = read_pack_header(self._file.read) 

1809 

1810 # Use delta_cache_size config if available, otherwise default 

1811 cache_size = delta_cache_size or (1024 * 1024 * 20) 

1812 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1813 cache_size, compute_size=_compute_object_size 

1814 ) 

1815 

1816 @property 

1817 def filename(self) -> str: 

1818 """Get the filename of the pack file. 

1819 

1820 Returns: 

1821 Base filename without directory path 

1822 """ 

1823 return os.path.basename(self._filename) 

1824 

1825 @property 

1826 def path(self) -> str | os.PathLike[str]: 

1827 """Get the full path of the pack file. 

1828 

1829 Returns: 

1830 Full path to the pack file 

1831 """ 

1832 return self._filename 

1833 

1834 @classmethod 

1835 def from_file( 

1836 cls, 

1837 file: IO[bytes], 

1838 object_format: ObjectFormat, 

1839 size: int | None = None, 

1840 ) -> "PackData": 

1841 """Create a PackData object from an open file. 

1842 

1843 Args: 

1844 file: Open file object 

1845 object_format: Object format 

1846 size: Optional file size 

1847 

1848 Returns: 

1849 PackData instance 

1850 """ 

1851 return cls(str(file), object_format, file=file, size=size) 

1852 

1853 @classmethod 

1854 def from_path( 

1855 cls, 

1856 path: str | os.PathLike[str], 

1857 object_format: ObjectFormat, 

1858 ) -> "PackData": 

1859 """Create a PackData object from a file path. 

1860 

1861 Args: 

1862 path: Path to the pack file 

1863 object_format: Object format 

1864 

1865 Returns: 

1866 PackData instance 

1867 """ 

1868 return cls(filename=path, object_format=object_format) 

1869 

1870 def close(self) -> None: 

1871 """Close the underlying pack file.""" 

1872 if self._file is not None: 

1873 self._file.close() 

1874 self._file = None # type: ignore 

1875 

1876 def __del__(self) -> None: 

1877 """Ensure pack file is closed when PackData is garbage collected.""" 

1878 if self._file is not None: 

1879 import warnings 

1880 

1881 warnings.warn( 

1882 f"unclosed PackData {self!r}", 

1883 ResourceWarning, 

1884 stacklevel=2, 

1885 source=self, 

1886 ) 

1887 try: 

1888 self.close() 

1889 except Exception: 

1890 # Ignore errors during cleanup 

1891 pass 

1892 

1893 def __enter__(self) -> "PackData": 

1894 """Enter context manager.""" 

1895 return self 

1896 

1897 def __exit__( 

1898 self, 

1899 exc_type: type | None, 

1900 exc_val: BaseException | None, 

1901 exc_tb: TracebackType | None, 

1902 ) -> None: 

1903 """Exit context manager.""" 

1904 self.close() 

1905 

1906 def __eq__(self, other: object) -> bool: 

1907 """Check equality with another object.""" 

1908 if isinstance(other, PackData): 

1909 return self.get_stored_checksum() == other.get_stored_checksum() 

1910 return False 

1911 

1912 def _get_size(self) -> int: 

1913 if self._size is not None: 

1914 return self._size 

1915 self._size = os.path.getsize(self._filename) 

1916 if self._size < self._header_size: 

1917 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1918 raise AssertionError(errmsg) 

1919 return self._size 

1920 

1921 def __len__(self) -> int: 

1922 """Returns the number of objects in this pack.""" 

1923 return self._num_objects 

1924 

1925 def calculate_checksum(self) -> bytes: 

1926 """Calculate the checksum for this pack. 

1927 

1928 Returns: Binary digest (size depends on hash algorithm) 

1929 """ 

1930 return compute_file_sha( 

1931 self._file, 

1932 hash_func=self.object_format.hash_func, 

1933 end_ofs=-self.object_format.oid_length, 

1934 ).digest() 

1935 

1936 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1937 """Iterate over unpacked objects in the pack.""" 

1938 self._file.seek(self._header_size) 

1939 

1940 if self._num_objects is None: 

1941 return 

1942 

1943 for _ in range(self._num_objects): 

1944 offset = self._file.tell() 

1945 unpacked, unused = unpack_object( 

1946 self._file.read, 

1947 self.object_format.hash_func, 

1948 compute_crc32=False, 

1949 include_comp=include_comp, 

1950 ) 

1951 unpacked.offset = offset 

1952 yield unpacked 

1953 # Back up over unused data. 

1954 self._file.seek(-len(unused), SEEK_CUR) 

1955 

1956 def iterentries( 

1957 self, 

1958 progress: Callable[[int, int], None] | None = None, 

1959 resolve_ext_ref: ResolveExtRefFn | None = None, 

1960 ) -> Iterator[PackIndexEntry]: 

1961 """Yield entries summarizing the contents of this pack. 

1962 

1963 Args: 

1964 progress: Progress function, called with current and total 

1965 object count. 

1966 resolve_ext_ref: Optional function to resolve external references 

1967 Returns: iterator of tuples with (sha, offset, crc32) 

1968 """ 

1969 num_objects = self._num_objects 

1970 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1971 for i, result in enumerate(indexer): 

1972 if progress is not None: 

1973 progress(i, num_objects) 

1974 yield result 

1975 

1976 def sorted_entries( 

1977 self, 

1978 progress: Callable[[int, int], None] | None = None, 

1979 resolve_ext_ref: ResolveExtRefFn | None = None, 

1980 ) -> list[tuple[RawObjectID, int, int]]: 

1981 """Return entries in this pack, sorted by SHA. 

1982 

1983 Args: 

1984 progress: Progress function, called with current and total 

1985 object count 

1986 resolve_ext_ref: Optional function to resolve external references 

1987 Returns: Iterator of tuples with (sha, offset, crc32) 

1988 """ 

1989 return sorted( 

1990 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore 

1991 ) 

1992 

1993 def create_index_v1( 

1994 self, 

1995 filename: str, 

1996 progress: Callable[..., None] | None = None, 

1997 resolve_ext_ref: ResolveExtRefFn | None = None, 

1998 ) -> bytes: 

1999 """Create a version 1 file for this data file. 

2000 

2001 Args: 

2002 filename: Index filename. 

2003 progress: Progress report function 

2004 resolve_ext_ref: Optional function to resolve external references 

2005 Returns: Checksum of index file 

2006 """ 

2007 entries = self.sorted_entries( 

2008 progress=progress, resolve_ext_ref=resolve_ext_ref 

2009 ) 

2010 checksum = self.calculate_checksum() 

2011 with GitFile(filename, "wb") as f: 

2012 write_pack_index_v1( 

2013 f, 

2014 entries, 

2015 checksum, 

2016 ) 

2017 return checksum 

2018 

2019 def create_index_v2( 

2020 self, 

2021 filename: str, 

2022 progress: Callable[..., None] | None = None, 

2023 resolve_ext_ref: ResolveExtRefFn | None = None, 

2024 ) -> bytes: 

2025 """Create a version 2 index file for this data file. 

2026 

2027 Args: 

2028 filename: Index filename. 

2029 progress: Progress report function 

2030 resolve_ext_ref: Optional function to resolve external references 

2031 Returns: Checksum of index file 

2032 """ 

2033 entries = self.sorted_entries( 

2034 progress=progress, resolve_ext_ref=resolve_ext_ref 

2035 ) 

2036 with GitFile(filename, "wb") as f: 

2037 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

2038 

2039 def create_index_v3( 

2040 self, 

2041 filename: str, 

2042 progress: Callable[..., None] | None = None, 

2043 resolve_ext_ref: ResolveExtRefFn | None = None, 

2044 hash_format: int | None = None, 

2045 ) -> bytes: 

2046 """Create a version 3 index file for this data file. 

2047 

2048 Args: 

2049 filename: Index filename. 

2050 progress: Progress report function 

2051 resolve_ext_ref: Function to resolve external references 

2052 hash_format: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

2053 Returns: Checksum of index file 

2054 """ 

2055 entries = self.sorted_entries( 

2056 progress=progress, resolve_ext_ref=resolve_ext_ref 

2057 ) 

2058 with GitFile(filename, "wb") as f: 

2059 if hash_format is None: 

2060 hash_format = 1 # Default to SHA-1 

2061 return write_pack_index_v3( 

2062 f, entries, self.calculate_checksum(), hash_format=hash_format 

2063 ) 

2064 

2065 def create_index( 

2066 self, 

2067 filename: str, 

2068 progress: Callable[..., None] | None = None, 

2069 version: int = 2, 

2070 resolve_ext_ref: ResolveExtRefFn | None = None, 

2071 hash_format: int | None = None, 

2072 ) -> bytes: 

2073 """Create an index file for this data file. 

2074 

2075 Args: 

2076 filename: Index filename. 

2077 progress: Progress report function 

2078 version: Index version (1, 2, or 3) 

2079 resolve_ext_ref: Function to resolve external references 

2080 hash_format: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

2081 Returns: Checksum of index file 

2082 """ 

2083 if version == 1: 

2084 return self.create_index_v1( 

2085 filename, progress, resolve_ext_ref=resolve_ext_ref 

2086 ) 

2087 elif version == 2: 

2088 return self.create_index_v2( 

2089 filename, progress, resolve_ext_ref=resolve_ext_ref 

2090 ) 

2091 elif version == 3: 

2092 return self.create_index_v3( 

2093 filename, 

2094 progress, 

2095 resolve_ext_ref=resolve_ext_ref, 

2096 hash_format=hash_format, 

2097 ) 

2098 else: 

2099 raise ValueError(f"unknown index format {version}") 

2100 

2101 def get_stored_checksum(self) -> bytes: 

2102 """Return the expected checksum stored in this pack.""" 

2103 checksum_size = self.object_format.oid_length 

2104 self._file.seek(-checksum_size, SEEK_END) 

2105 return self._file.read(checksum_size) 

2106 

2107 def check(self) -> None: 

2108 """Check the consistency of this pack.""" 

2109 actual = self.calculate_checksum() 

2110 stored = self.get_stored_checksum() 

2111 if actual != stored: 

2112 raise ChecksumMismatch(stored, actual) 

2113 

2114 def get_unpacked_object_at( 

2115 self, offset: int, *, include_comp: bool = False 

2116 ) -> UnpackedObject: 

2117 """Given offset in the packfile return a UnpackedObject.""" 

2118 assert offset >= self._header_size 

2119 self._file.seek(offset) 

2120 unpacked, _ = unpack_object( 

2121 self._file.read, self.object_format.hash_func, include_comp=include_comp 

2122 ) 

2123 unpacked.offset = offset 

2124 return unpacked 

2125 

2126 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

2127 """Given an offset in to the packfile return the object that is there. 

2128 

2129 Using the associated index the location of an object can be looked up, 

2130 and then the packfile can be asked directly for that object using this 

2131 function. 

2132 """ 

2133 try: 

2134 return self._offset_cache[offset] 

2135 except KeyError: 

2136 pass 

2137 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

2138 return (unpacked.pack_type_num, unpacked._obj()) 

2139 

2140 

2141T = TypeVar("T") 

2142 

2143 

2144class DeltaChainIterator(Generic[T]): 

2145 """Abstract iterator over pack data based on delta chains. 

2146 

2147 Each object in the pack is guaranteed to be inflated exactly once, 

2148 regardless of how many objects reference it as a delta base. As a result, 

2149 memory usage is proportional to the length of the longest delta chain. 

2150 

2151 Subclasses can override _result to define the result type of the iterator. 

2152 By default, results are UnpackedObjects with the following members set: 

2153 

2154 * offset 

2155 * obj_type_num 

2156 * obj_chunks 

2157 * pack_type_num 

2158 * delta_base (for delta types) 

2159 * comp_chunks (if _include_comp is True) 

2160 * decomp_chunks 

2161 * decomp_len 

2162 * crc32 (if _compute_crc32 is True) 

2163 """ 

2164 

2165 _compute_crc32 = False 

2166 _include_comp = False 

2167 

2168 def __init__( 

2169 self, 

2170 file_obj: IO[bytes] | None, 

2171 hash_func: Callable[[], "HashObject"], 

2172 *, 

2173 resolve_ext_ref: ResolveExtRefFn | None = None, 

2174 ) -> None: 

2175 """Initialize DeltaChainIterator. 

2176 

2177 Args: 

2178 file_obj: File object to read pack data from 

2179 hash_func: Hash function to use for computing object IDs 

2180 resolve_ext_ref: Optional function to resolve external references 

2181 """ 

2182 self._file = file_obj 

2183 self.hash_func = hash_func 

2184 self._resolve_ext_ref = resolve_ext_ref 

2185 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

2186 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

2187 self._full_ofs: list[tuple[int, int]] = [] 

2188 self._ext_refs: list[RawObjectID] = [] 

2189 

2190 @classmethod 

2191 def for_pack_data( 

2192 cls, pack_data: PackData, resolve_ext_ref: ResolveExtRefFn | None = None 

2193 ) -> "DeltaChainIterator[T]": 

2194 """Create a DeltaChainIterator from pack data. 

2195 

2196 Args: 

2197 pack_data: PackData object to iterate 

2198 resolve_ext_ref: Optional function to resolve external refs 

2199 

2200 Returns: 

2201 DeltaChainIterator instance 

2202 """ 

2203 walker = cls( 

2204 None, pack_data.object_format.hash_func, resolve_ext_ref=resolve_ext_ref 

2205 ) 

2206 walker.set_pack_data(pack_data) 

2207 for unpacked in pack_data.iter_unpacked(include_comp=False): 

2208 walker.record(unpacked) 

2209 return walker 

2210 

2211 @classmethod 

2212 def for_pack_subset( 

2213 cls, 

2214 pack: "Pack", 

2215 shas: Iterable[ObjectID | RawObjectID], 

2216 *, 

2217 allow_missing: bool = False, 

2218 resolve_ext_ref: ResolveExtRefFn | None = None, 

2219 ) -> "DeltaChainIterator[T]": 

2220 """Create a DeltaChainIterator for a subset of objects. 

2221 

2222 Args: 

2223 pack: Pack object containing the data 

2224 shas: Iterable of object SHAs to include 

2225 allow_missing: If True, skip missing objects 

2226 resolve_ext_ref: Optional function to resolve external refs 

2227 

2228 Returns: 

2229 DeltaChainIterator instance 

2230 """ 

2231 walker = cls( 

2232 None, pack.object_format.hash_func, resolve_ext_ref=resolve_ext_ref 

2233 ) 

2234 walker.set_pack_data(pack.data) 

2235 todo = set() 

2236 for sha in shas: 

2237 try: 

2238 off = pack.index.object_offset(sha) 

2239 except KeyError: 

2240 if not allow_missing: 

2241 raise 

2242 else: 

2243 todo.add(off) 

2244 done = set() 

2245 while todo: 

2246 off = todo.pop() 

2247 unpacked = pack.data.get_unpacked_object_at(off) 

2248 walker.record(unpacked) 

2249 done.add(off) 

2250 base_ofs = None 

2251 if unpacked.pack_type_num == OFS_DELTA: 

2252 assert unpacked.offset is not None 

2253 assert unpacked.delta_base is not None 

2254 assert isinstance(unpacked.delta_base, int) 

2255 base_ofs = unpacked.offset - unpacked.delta_base 

2256 elif unpacked.pack_type_num == REF_DELTA: 

2257 with suppress(KeyError): 

2258 assert isinstance(unpacked.delta_base, bytes) 

2259 base_ofs = pack.index.object_offset( 

2260 RawObjectID(unpacked.delta_base) 

2261 ) 

2262 if base_ofs is not None and base_ofs not in done: 

2263 todo.add(base_ofs) 

2264 return walker 

2265 

2266 def record(self, unpacked: UnpackedObject) -> None: 

2267 """Record an unpacked object for later processing. 

2268 

2269 Args: 

2270 unpacked: UnpackedObject to record 

2271 """ 

2272 type_num = unpacked.pack_type_num 

2273 offset = unpacked.offset 

2274 assert offset is not None 

2275 if type_num == OFS_DELTA: 

2276 assert unpacked.delta_base is not None 

2277 assert isinstance(unpacked.delta_base, int) 

2278 base_offset = offset - unpacked.delta_base 

2279 self._pending_ofs[base_offset].append(offset) 

2280 elif type_num == REF_DELTA: 

2281 assert isinstance(unpacked.delta_base, bytes) 

2282 self._pending_ref[unpacked.delta_base].append(offset) 

2283 else: 

2284 self._full_ofs.append((offset, type_num)) 

2285 

2286 def set_pack_data(self, pack_data: PackData) -> None: 

2287 """Set the pack data for iteration. 

2288 

2289 Args: 

2290 pack_data: PackData object to use 

2291 """ 

2292 self._file = pack_data._file 

2293 

2294 def _walk_all_chains(self) -> Iterator[T]: 

2295 for offset, type_num in self._full_ofs: 

2296 yield from self._follow_chain(offset, type_num, None) 

2297 yield from self._walk_ref_chains() 

2298 assert not self._pending_ofs, repr(self._pending_ofs) 

2299 

2300 def _ensure_no_pending(self) -> None: 

2301 if self._pending_ref: 

2302 raise UnresolvedDeltas( 

2303 [sha_to_hex(RawObjectID(s)) for s in self._pending_ref] 

2304 ) 

2305 

2306 def _walk_ref_chains(self) -> Iterator[T]: 

2307 if not self._resolve_ext_ref: 

2308 self._ensure_no_pending() 

2309 return 

2310 

2311 for base_sha, pending in sorted(self._pending_ref.items()): 

2312 if base_sha not in self._pending_ref: 

2313 continue 

2314 try: 

2315 type_num, chunks = self._resolve_ext_ref(base_sha) 

2316 except KeyError: 

2317 # Not an external ref, but may depend on one. Either it will 

2318 # get popped via a _follow_chain call, or we will raise an 

2319 # error below. 

2320 continue 

2321 self._ext_refs.append(RawObjectID(base_sha)) 

2322 self._pending_ref.pop(base_sha) 

2323 for new_offset in pending: 

2324 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type] 

2325 

2326 self._ensure_no_pending() 

2327 

2328 def _result(self, unpacked: UnpackedObject) -> T: 

2329 raise NotImplementedError 

2330 

2331 def _resolve_object( 

2332 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None 

2333 ) -> UnpackedObject: 

2334 assert self._file is not None 

2335 self._file.seek(offset) 

2336 unpacked, _ = unpack_object( 

2337 self._file.read, 

2338 self.hash_func, 

2339 read_some=None, 

2340 compute_crc32=self._compute_crc32, 

2341 include_comp=self._include_comp, 

2342 ) 

2343 unpacked.offset = offset 

2344 if base_chunks is None: 

2345 assert unpacked.pack_type_num == obj_type_num 

2346 else: 

2347 assert unpacked.pack_type_num in DELTA_TYPES 

2348 unpacked.obj_type_num = obj_type_num 

2349 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2350 return unpacked 

2351 

2352 def _follow_chain( 

2353 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None 

2354 ) -> Iterator[T]: 

2355 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2356 # this approach by design inflates each object exactly once. 

2357 todo = [(offset, obj_type_num, base_chunks)] 

2358 while todo: 

2359 (offset, obj_type_num, base_chunks) = todo.pop() 

2360 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2361 yield self._result(unpacked) 

2362 

2363 assert unpacked.offset is not None 

2364 unblocked = chain( 

2365 self._pending_ofs.pop(unpacked.offset, []), 

2366 self._pending_ref.pop(unpacked.sha(), []), 

2367 ) 

2368 todo.extend( 

2369 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2370 for new_offset in unblocked 

2371 ) 

2372 

2373 def __iter__(self) -> Iterator[T]: 

2374 """Iterate over objects in the pack.""" 

2375 return self._walk_all_chains() 

2376 

2377 def ext_refs(self) -> list[RawObjectID]: 

2378 """Return external references.""" 

2379 return self._ext_refs 

2380 

2381 

2382class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2383 """Delta chain iterator that yield unpacked objects.""" 

2384 

2385 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2386 """Return the unpacked object. 

2387 

2388 Args: 

2389 unpacked: The unpacked object 

2390 

2391 Returns: 

2392 The unpacked object unchanged 

2393 """ 

2394 return unpacked 

2395 

2396 

2397class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2398 """Delta chain iterator that yields index entries.""" 

2399 

2400 _compute_crc32 = True 

2401 

2402 def _result(self, unpacked: UnpackedObject) -> PackIndexEntry: 

2403 """Convert unpacked object to pack index entry. 

2404 

2405 Args: 

2406 unpacked: The unpacked object 

2407 

2408 Returns: 

2409 Tuple of (sha, offset, crc32) for index entry 

2410 """ 

2411 assert unpacked.offset is not None 

2412 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2413 

2414 

2415class PackInflater(DeltaChainIterator[ShaFile]): 

2416 """Delta chain iterator that yields ShaFile objects.""" 

2417 

2418 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2419 """Convert unpacked object to ShaFile. 

2420 

2421 Args: 

2422 unpacked: The unpacked object 

2423 

2424 Returns: 

2425 ShaFile object from the unpacked data 

2426 """ 

2427 return unpacked.sha_file() 

2428 

2429 

2430class SHA1Reader(BinaryIO): 

2431 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2432 

2433 def __init__(self, f: IO[bytes]) -> None: 

2434 """Initialize SHA1Reader. 

2435 

2436 Args: 

2437 f: File-like object to wrap 

2438 """ 

2439 self.f = f 

2440 self.sha1 = sha1(b"") 

2441 

2442 def read(self, size: int = -1) -> bytes: 

2443 """Read bytes and update SHA1. 

2444 

2445 Args: 

2446 size: Number of bytes to read, -1 for all 

2447 

2448 Returns: 

2449 Bytes read from file 

2450 """ 

2451 data = self.f.read(size) 

2452 self.sha1.update(data) 

2453 return data 

2454 

2455 def check_sha(self, allow_empty: bool = False) -> None: 

2456 """Check if the SHA1 matches the expected value. 

2457 

2458 Args: 

2459 allow_empty: Allow empty SHA1 hash 

2460 

2461 Raises: 

2462 ChecksumMismatch: If SHA1 doesn't match 

2463 """ 

2464 stored = self.f.read(20) 

2465 # If git option index.skipHash is set the index will be empty 

2466 if stored != self.sha1.digest() and ( 

2467 not allow_empty 

2468 or ( 

2469 len(stored) == 20 

2470 and sha_to_hex(RawObjectID(stored)) 

2471 != b"0000000000000000000000000000000000000000" 

2472 ) 

2473 ): 

2474 raise ChecksumMismatch( 

2475 self.sha1.hexdigest(), 

2476 sha_to_hex(RawObjectID(stored)) if stored else b"", 

2477 ) 

2478 

2479 def close(self) -> None: 

2480 """Close the underlying file.""" 

2481 return self.f.close() 

2482 

2483 def tell(self) -> int: 

2484 """Return current file position.""" 

2485 return self.f.tell() 

2486 

2487 # BinaryIO abstract methods 

2488 def readable(self) -> bool: 

2489 """Check if file is readable.""" 

2490 return True 

2491 

2492 def writable(self) -> bool: 

2493 """Check if file is writable.""" 

2494 return False 

2495 

2496 def seekable(self) -> bool: 

2497 """Check if file is seekable.""" 

2498 return getattr(self.f, "seekable", lambda: False)() 

2499 

2500 def seek(self, offset: int, whence: int = 0) -> int: 

2501 """Seek to position in file. 

2502 

2503 Args: 

2504 offset: Position offset 

2505 whence: Reference point (0=start, 1=current, 2=end) 

2506 

2507 Returns: 

2508 New file position 

2509 """ 

2510 return self.f.seek(offset, whence) 

2511 

2512 def flush(self) -> None: 

2513 """Flush the file buffer.""" 

2514 if hasattr(self.f, "flush"): 

2515 self.f.flush() 

2516 

2517 def readline(self, size: int = -1) -> bytes: 

2518 """Read a line from the file. 

2519 

2520 Args: 

2521 size: Maximum bytes to read 

2522 

2523 Returns: 

2524 Line read from file 

2525 """ 

2526 return self.f.readline(size) 

2527 

2528 def readlines(self, hint: int = -1) -> list[bytes]: 

2529 """Read all lines from the file. 

2530 

2531 Args: 

2532 hint: Approximate number of bytes to read 

2533 

2534 Returns: 

2535 List of lines 

2536 """ 

2537 return self.f.readlines(hint) 

2538 

2539 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2540 """Write multiple lines to the file (not supported).""" 

2541 raise UnsupportedOperation("writelines") 

2542 

2543 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2544 """Write data to the file (not supported).""" 

2545 raise UnsupportedOperation("write") 

2546 

2547 def __enter__(self) -> "SHA1Reader": 

2548 """Enter context manager.""" 

2549 return self 

2550 

2551 def __exit__( 

2552 self, 

2553 type: type | None, 

2554 value: BaseException | None, 

2555 traceback: TracebackType | None, 

2556 ) -> None: 

2557 """Exit context manager and close file.""" 

2558 self.close() 

2559 

2560 def __iter__(self) -> "SHA1Reader": 

2561 """Return iterator for reading file lines.""" 

2562 return self 

2563 

2564 def __next__(self) -> bytes: 

2565 """Get next line from file. 

2566 

2567 Returns: 

2568 Next line 

2569 

2570 Raises: 

2571 StopIteration: When no more lines 

2572 """ 

2573 line = self.readline() 

2574 if not line: 

2575 raise StopIteration 

2576 return line 

2577 

2578 def fileno(self) -> int: 

2579 """Return file descriptor number.""" 

2580 return self.f.fileno() 

2581 

2582 def isatty(self) -> bool: 

2583 """Check if file is a terminal.""" 

2584 return getattr(self.f, "isatty", lambda: False)() 

2585 

2586 def truncate(self, size: int | None = None) -> int: 

2587 """Not supported for read-only file. 

2588 

2589 Raises: 

2590 UnsupportedOperation: Always raised 

2591 """ 

2592 raise UnsupportedOperation("truncate") 

2593 

2594 

2595class SHA1Writer(BinaryIO): 

2596 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2597 

2598 def __init__(self, f: BinaryIO | IO[bytes]) -> None: 

2599 """Initialize SHA1Writer. 

2600 

2601 Args: 

2602 f: File-like object to wrap 

2603 """ 

2604 self.f = f 

2605 self.length = 0 

2606 self.sha1 = sha1(b"") 

2607 self.digest: bytes | None = None 

2608 

2609 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2610 """Write data and update SHA1. 

2611 

2612 Args: 

2613 data: Data to write 

2614 

2615 Returns: 

2616 Number of bytes written 

2617 """ 

2618 self.sha1.update(data) 

2619 written = self.f.write(data) 

2620 self.length += written 

2621 return written 

2622 

2623 def write_sha(self) -> bytes: 

2624 """Write the SHA1 digest to the file. 

2625 

2626 Returns: 

2627 The SHA1 digest bytes 

2628 """ 

2629 sha = self.sha1.digest() 

2630 assert len(sha) == 20 

2631 self.f.write(sha) 

2632 self.length += len(sha) 

2633 return sha 

2634 

2635 def close(self) -> None: 

2636 """Close the pack file and finalize the SHA.""" 

2637 self.digest = self.write_sha() 

2638 self.f.close() 

2639 

2640 def offset(self) -> int: 

2641 """Get the total number of bytes written. 

2642 

2643 Returns: 

2644 Total bytes written 

2645 """ 

2646 return self.length 

2647 

2648 def tell(self) -> int: 

2649 """Return current file position.""" 

2650 return self.f.tell() 

2651 

2652 # BinaryIO abstract methods 

2653 def readable(self) -> bool: 

2654 """Check if file is readable.""" 

2655 return False 

2656 

2657 def writable(self) -> bool: 

2658 """Check if file is writable.""" 

2659 return True 

2660 

2661 def seekable(self) -> bool: 

2662 """Check if file is seekable.""" 

2663 return getattr(self.f, "seekable", lambda: False)() 

2664 

2665 def seek(self, offset: int, whence: int = 0) -> int: 

2666 """Seek to position in file. 

2667 

2668 Args: 

2669 offset: Position offset 

2670 whence: Reference point (0=start, 1=current, 2=end) 

2671 

2672 Returns: 

2673 New file position 

2674 """ 

2675 return self.f.seek(offset, whence) 

2676 

2677 def flush(self) -> None: 

2678 """Flush the file buffer.""" 

2679 if hasattr(self.f, "flush"): 

2680 self.f.flush() 

2681 

2682 def readline(self, size: int = -1) -> bytes: 

2683 """Not supported for write-only file. 

2684 

2685 Raises: 

2686 UnsupportedOperation: Always raised 

2687 """ 

2688 raise UnsupportedOperation("readline") 

2689 

2690 def readlines(self, hint: int = -1) -> list[bytes]: 

2691 """Not supported for write-only file. 

2692 

2693 Raises: 

2694 UnsupportedOperation: Always raised 

2695 """ 

2696 raise UnsupportedOperation("readlines") 

2697 

2698 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2699 """Write multiple lines to the file. 

2700 

2701 Args: 

2702 lines: Iterable of lines to write 

2703 """ 

2704 for line in lines: 

2705 self.write(line) 

2706 

2707 def read(self, size: int = -1) -> bytes: 

2708 """Not supported for write-only file. 

2709 

2710 Raises: 

2711 UnsupportedOperation: Always raised 

2712 """ 

2713 raise UnsupportedOperation("read") 

2714 

2715 def __enter__(self) -> "SHA1Writer": 

2716 """Enter context manager.""" 

2717 return self 

2718 

2719 def __exit__( 

2720 self, 

2721 type: type | None, 

2722 value: BaseException | None, 

2723 traceback: TracebackType | None, 

2724 ) -> None: 

2725 """Exit context manager and close file.""" 

2726 self.f.close() 

2727 

2728 def __iter__(self) -> "SHA1Writer": 

2729 """Return iterator.""" 

2730 return self 

2731 

2732 def __next__(self) -> bytes: 

2733 """Not supported for write-only file. 

2734 

2735 Raises: 

2736 UnsupportedOperation: Always raised 

2737 """ 

2738 raise UnsupportedOperation("__next__") 

2739 

2740 def fileno(self) -> int: 

2741 """Return file descriptor number.""" 

2742 return self.f.fileno() 

2743 

2744 def isatty(self) -> bool: 

2745 """Check if file is a terminal.""" 

2746 return getattr(self.f, "isatty", lambda: False)() 

2747 

2748 def truncate(self, size: int | None = None) -> int: 

2749 """Not supported for write-only file. 

2750 

2751 Raises: 

2752 UnsupportedOperation: Always raised 

2753 """ 

2754 raise UnsupportedOperation("truncate") 

2755 

2756 

2757class HashWriter(BinaryIO): 

2758 """Wrapper for file-like object that computes hash of its data. 

2759 

2760 This is a generic version that works with any hash algorithm. 

2761 """ 

2762 

2763 def __init__( 

2764 self, f: BinaryIO | IO[bytes], hash_func: Callable[[], "HashObject"] 

2765 ) -> None: 

2766 """Initialize HashWriter. 

2767 

2768 Args: 

2769 f: File-like object to wrap 

2770 hash_func: Hash function (e.g., sha1, sha256) 

2771 """ 

2772 self.f = f 

2773 self.length = 0 

2774 self.hash_obj = hash_func() 

2775 self.digest: bytes | None = None 

2776 

2777 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2778 """Write data and update hash. 

2779 

2780 Args: 

2781 data: Data to write 

2782 

2783 Returns: 

2784 Number of bytes written 

2785 """ 

2786 self.hash_obj.update(data) 

2787 written = self.f.write(data) 

2788 self.length += written 

2789 return written 

2790 

2791 def write_hash(self) -> bytes: 

2792 """Write the hash digest to the file. 

2793 

2794 Returns: 

2795 The hash digest bytes 

2796 """ 

2797 digest = self.hash_obj.digest() 

2798 self.f.write(digest) 

2799 self.length += len(digest) 

2800 return digest 

2801 

2802 def close(self) -> None: 

2803 """Close the pack file and finalize the hash.""" 

2804 self.digest = self.write_hash() 

2805 self.f.close() 

2806 

2807 def offset(self) -> int: 

2808 """Get the total number of bytes written. 

2809 

2810 Returns: 

2811 Total bytes written 

2812 """ 

2813 return self.length 

2814 

2815 def tell(self) -> int: 

2816 """Return current file position.""" 

2817 return self.f.tell() 

2818 

2819 # BinaryIO abstract methods 

2820 def readable(self) -> bool: 

2821 """Check if file is readable.""" 

2822 return False 

2823 

2824 def writable(self) -> bool: 

2825 """Check if file is writable.""" 

2826 return True 

2827 

2828 def seekable(self) -> bool: 

2829 """Check if file is seekable.""" 

2830 return getattr(self.f, "seekable", lambda: False)() 

2831 

2832 def seek(self, offset: int, whence: int = 0) -> int: 

2833 """Seek to position in file. 

2834 

2835 Args: 

2836 offset: Position offset 

2837 whence: Reference point (0=start, 1=current, 2=end) 

2838 

2839 Returns: 

2840 New file position 

2841 """ 

2842 return self.f.seek(offset, whence) 

2843 

2844 def flush(self) -> None: 

2845 """Flush the file buffer.""" 

2846 if hasattr(self.f, "flush"): 

2847 self.f.flush() 

2848 

2849 def readline(self, size: int = -1) -> bytes: 

2850 """Not supported for write-only file. 

2851 

2852 Raises: 

2853 UnsupportedOperation: Always raised 

2854 """ 

2855 raise UnsupportedOperation("readline") 

2856 

2857 def readlines(self, hint: int = -1) -> list[bytes]: 

2858 """Not supported for write-only file. 

2859 

2860 Raises: 

2861 UnsupportedOperation: Always raised 

2862 """ 

2863 raise UnsupportedOperation("readlines") 

2864 

2865 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2866 """Write multiple lines to the file. 

2867 

2868 Args: 

2869 lines: Iterable of lines to write 

2870 """ 

2871 for line in lines: 

2872 self.write(line) 

2873 

2874 def read(self, size: int = -1) -> bytes: 

2875 """Not supported for write-only file. 

2876 

2877 Raises: 

2878 UnsupportedOperation: Always raised 

2879 """ 

2880 raise UnsupportedOperation("read") 

2881 

2882 def __enter__(self) -> "HashWriter": 

2883 """Enter context manager.""" 

2884 return self 

2885 

2886 def __exit__( 

2887 self, 

2888 type: type | None, 

2889 value: BaseException | None, 

2890 traceback: TracebackType | None, 

2891 ) -> None: 

2892 """Exit context manager and close file.""" 

2893 self.close() 

2894 

2895 def __iter__(self) -> "HashWriter": 

2896 """Return iterator.""" 

2897 return self 

2898 

2899 def __next__(self) -> bytes: 

2900 """Not supported for write-only file. 

2901 

2902 Raises: 

2903 UnsupportedOperation: Always raised 

2904 """ 

2905 raise UnsupportedOperation("__next__") 

2906 

2907 def fileno(self) -> int: 

2908 """Return file descriptor number.""" 

2909 return self.f.fileno() 

2910 

2911 def isatty(self) -> bool: 

2912 """Check if file is a terminal.""" 

2913 return getattr(self.f, "isatty", lambda: False)() 

2914 

2915 def truncate(self, size: int | None = None) -> int: 

2916 """Not supported for write-only file. 

2917 

2918 Raises: 

2919 UnsupportedOperation: Always raised 

2920 """ 

2921 raise UnsupportedOperation("truncate") 

2922 

2923 

2924def pack_object_header( 

2925 type_num: int, 

2926 delta_base: bytes | int | None, 

2927 size: int, 

2928 object_format: "ObjectFormat", 

2929) -> bytearray: 

2930 """Create a pack object header for the given object info. 

2931 

2932 Args: 

2933 type_num: Numeric type of the object. 

2934 delta_base: Delta base offset or ref, or None for whole objects. 

2935 size: Uncompressed object size. 

2936 object_format: Object format (hash algorithm) to use. 

2937 Returns: A header for a packed object. 

2938 """ 

2939 header = [] 

2940 c = (type_num << 4) | (size & 15) 

2941 size >>= 4 

2942 while size: 

2943 header.append(c | 0x80) 

2944 c = size & 0x7F 

2945 size >>= 7 

2946 header.append(c) 

2947 if type_num == OFS_DELTA: 

2948 assert isinstance(delta_base, int) 

2949 ret = [delta_base & 0x7F] 

2950 delta_base >>= 7 

2951 while delta_base: 

2952 delta_base -= 1 

2953 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2954 delta_base >>= 7 

2955 header.extend(ret) 

2956 elif type_num == REF_DELTA: 

2957 assert isinstance(delta_base, bytes) 

2958 assert len(delta_base) == object_format.oid_length 

2959 header += delta_base 

2960 return bytearray(header) 

2961 

2962 

2963def pack_object_chunks( 

2964 type: int, 

2965 object: list[bytes] | tuple[bytes | int, list[bytes]], 

2966 object_format: "ObjectFormat", 

2967 *, 

2968 compression_level: int = -1, 

2969) -> Iterator[bytes]: 

2970 """Generate chunks for a pack object. 

2971 

2972 Args: 

2973 type: Numeric type of the object 

2974 object: Object to write 

2975 object_format: Object format (hash algorithm) to use 

2976 compression_level: the zlib compression level 

2977 Returns: Chunks 

2978 """ 

2979 if type in DELTA_TYPES: 

2980 if isinstance(object, tuple): 

2981 delta_base, object = object 

2982 else: 

2983 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2984 else: 

2985 delta_base = None 

2986 

2987 # Convert object to list of bytes chunks 

2988 if isinstance(object, bytes): 

2989 chunks = [object] 

2990 elif isinstance(object, list): 

2991 chunks = object 

2992 elif isinstance(object, ShaFile): 

2993 chunks = object.as_raw_chunks() 

2994 else: 

2995 # Shouldn't reach here with proper typing 

2996 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

2997 

2998 yield bytes( 

2999 pack_object_header( 

3000 type, delta_base, sum(map(len, chunks)), object_format=object_format 

3001 ) 

3002 ) 

3003 compressor = zlib.compressobj(level=compression_level) 

3004 for data in chunks: 

3005 yield compressor.compress(data) 

3006 yield compressor.flush() 

3007 

3008 

3009def write_pack_object( 

3010 write: Callable[[bytes], int], 

3011 type: int, 

3012 object: list[bytes] | tuple[bytes | int, list[bytes]], 

3013 object_format: "ObjectFormat", 

3014 *, 

3015 sha: "HashObject | None" = None, 

3016 compression_level: int = -1, 

3017) -> int: 

3018 """Write pack object to a file. 

3019 

3020 Args: 

3021 write: Write function to use 

3022 type: Numeric type of the object 

3023 object: Object to write 

3024 object_format: Object format (hash algorithm) to use 

3025 sha: Optional SHA-1 hasher to update 

3026 compression_level: the zlib compression level 

3027 Returns: CRC32 checksum of the written object 

3028 """ 

3029 crc32 = 0 

3030 for chunk in pack_object_chunks( 

3031 type, object, compression_level=compression_level, object_format=object_format 

3032 ): 

3033 write(chunk) 

3034 if sha is not None: 

3035 sha.update(chunk) 

3036 crc32 = binascii.crc32(chunk, crc32) 

3037 return crc32 & 0xFFFFFFFF 

3038 

3039 

3040def write_pack( 

3041 filename: str, 

3042 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

3043 object_format: "ObjectFormat", 

3044 *, 

3045 deltify: bool | None = None, 

3046 delta_window_size: int | None = None, 

3047 compression_level: int = -1, 

3048) -> tuple[bytes, bytes]: 

3049 """Write a new pack data file. 

3050 

3051 Args: 

3052 filename: Path to the new pack file (without .pack extension) 

3053 objects: Objects to write to the pack 

3054 object_format: Object format 

3055 delta_window_size: Delta window size 

3056 deltify: Whether to deltify pack objects 

3057 compression_level: the zlib compression level 

3058 Returns: Tuple with checksum of pack file and index file 

3059 """ 

3060 with GitFile(filename + ".pack", "wb") as f: 

3061 entries, data_sum = write_pack_objects( 

3062 f, 

3063 objects, 

3064 delta_window_size=delta_window_size, 

3065 deltify=deltify, 

3066 compression_level=compression_level, 

3067 object_format=object_format, 

3068 ) 

3069 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

3070 with GitFile(filename + ".idx", "wb") as f: 

3071 idx_sha = write_pack_index(f, entries_list, data_sum) 

3072 return data_sum, idx_sha 

3073 

3074 

3075def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

3076 """Yield chunks for a pack header.""" 

3077 yield b"PACK" # Pack header 

3078 yield struct.pack(b">L", 2) # Pack version 

3079 yield struct.pack(b">L", num_objects) # Number of objects in pack 

3080 

3081 

3082def write_pack_header( 

3083 write: Callable[[bytes], int] | IO[bytes], num_objects: int 

3084) -> None: 

3085 """Write a pack header for the given number of objects.""" 

3086 write_fn: Callable[[bytes], int] 

3087 if hasattr(write, "write"): 

3088 write_fn = write.write 

3089 warnings.warn( 

3090 "write_pack_header() now takes a write rather than file argument", 

3091 DeprecationWarning, 

3092 stacklevel=2, 

3093 ) 

3094 else: 

3095 write_fn = write 

3096 for chunk in pack_header_chunks(num_objects): 

3097 write_fn(chunk) 

3098 

3099 

3100def find_reusable_deltas( 

3101 container: PackedObjectContainer, 

3102 object_ids: Set[ObjectID], 

3103 *, 

3104 other_haves: Set[ObjectID] | None = None, 

3105 progress: Callable[..., None] | None = None, 

3106) -> Iterator[UnpackedObject]: 

3107 """Find deltas in a pack that can be reused. 

3108 

3109 Args: 

3110 container: Pack container to search for deltas 

3111 object_ids: Set of object IDs to find deltas for 

3112 other_haves: Set of other object IDs we have 

3113 progress: Optional progress reporting callback 

3114 

3115 Returns: 

3116 Iterator of UnpackedObject entries that can be reused 

3117 """ 

3118 if other_haves is None: 

3119 other_haves = set() 

3120 reused = 0 

3121 for i, unpacked in enumerate( 

3122 container.iter_unpacked_subset( 

3123 object_ids, allow_missing=True, convert_ofs_delta=True 

3124 ) 

3125 ): 

3126 if progress is not None and i % 1000 == 0: 

3127 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

3128 if unpacked.pack_type_num == REF_DELTA: 

3129 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

3130 if hexsha in object_ids or hexsha in other_haves: 

3131 yield unpacked 

3132 reused += 1 

3133 if progress is not None: 

3134 progress((f"found {reused} deltas to reuse\n").encode()) 

3135 

3136 

3137def deltify_pack_objects( 

3138 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, bytes | None]], 

3139 *, 

3140 window_size: int | None = None, 

3141 progress: Callable[..., None] | None = None, 

3142) -> Iterator[UnpackedObject]: 

3143 """Generate deltas for pack objects. 

3144 

3145 Args: 

3146 objects: An iterable of (object, path) tuples to deltify. 

3147 window_size: Window size; None for default 

3148 progress: Optional progress reporting callback 

3149 Returns: Iterator over type_num, object id, delta_base, content 

3150 delta_base is None for full text entries 

3151 """ 

3152 

3153 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, bytes | None]]]: 

3154 for e in objects: 

3155 if isinstance(e, ShaFile): 

3156 yield (e, (e.type_num, None)) 

3157 else: 

3158 yield (e[0], (e[0].type_num, e[1])) 

3159 

3160 sorted_objs = sort_objects_for_delta(objects_with_hints()) 

3161 yield from deltas_from_sorted_objects( 

3162 sorted_objs, 

3163 window_size=window_size, 

3164 progress=progress, 

3165 ) 

3166 

3167 

3168def sort_objects_for_delta( 

3169 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, PackHint | None]], 

3170) -> Iterator[tuple[ShaFile, bytes | None]]: 

3171 """Sort objects for optimal delta compression. 

3172 

3173 Args: 

3174 objects: Iterator of objects or (object, hint) tuples 

3175 

3176 Returns: 

3177 Iterator of sorted (ShaFile, path) tuples 

3178 """ 

3179 magic = [] 

3180 for entry in objects: 

3181 if isinstance(entry, tuple): 

3182 obj, hint = entry 

3183 if hint is None: 

3184 type_num = None 

3185 path = None 

3186 else: 

3187 (type_num, path) = hint 

3188 else: 

3189 obj = entry 

3190 type_num = None 

3191 path = None 

3192 magic.append((type_num, path, -obj.raw_length(), obj)) 

3193 # Build a list of objects ordered by the magic Linus heuristic 

3194 # This helps us find good objects to diff against us 

3195 magic.sort() 

3196 return ((x[3], x[1]) for x in magic) 

3197 

3198 

3199def deltas_from_sorted_objects( 

3200 objects: Iterator[tuple[ShaFile, bytes | None]], 

3201 window_size: int | None = None, 

3202 progress: Callable[..., None] | None = None, 

3203) -> Iterator[UnpackedObject]: 

3204 """Create deltas from sorted objects. 

3205 

3206 Args: 

3207 objects: Iterator of sorted objects to deltify 

3208 window_size: Delta window size; None for default 

3209 progress: Optional progress reporting callback 

3210 

3211 Returns: 

3212 Iterator of UnpackedObject entries 

3213 """ 

3214 # TODO(jelmer): Use threads 

3215 if window_size is None: 

3216 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

3217 

3218 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

3219 for i, (o, path) in enumerate(objects): 

3220 if progress is not None and i % 1000 == 0: 

3221 progress((f"generating deltas: {i}\r").encode()) 

3222 raw = o.as_raw_chunks() 

3223 winner = raw 

3224 winner_len = sum(map(len, winner)) 

3225 winner_base = None 

3226 for base_id, base_type_num, base in possible_bases: 

3227 if base_type_num != o.type_num: 

3228 continue 

3229 delta_len = 0 

3230 delta = [] 

3231 for chunk in create_delta(b"".join(base), b"".join(raw)): 

3232 delta_len += len(chunk) 

3233 if delta_len >= winner_len: 

3234 break 

3235 delta.append(chunk) 

3236 else: 

3237 winner_base = base_id 

3238 winner = delta 

3239 winner_len = sum(map(len, winner)) 

3240 yield UnpackedObject( 

3241 o.type_num, 

3242 sha=o.sha().digest(), 

3243 delta_base=winner_base, 

3244 decomp_len=winner_len, 

3245 decomp_chunks=winner, 

3246 ) 

3247 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

3248 while len(possible_bases) > window_size: 

3249 possible_bases.pop() 

3250 

3251 

3252def pack_objects_to_data( 

3253 objects: Sequence[ShaFile] 

3254 | Sequence[tuple[ShaFile, bytes | None]] 

3255 | Sequence[tuple[ShaFile, PackHint | None]], 

3256 *, 

3257 deltify: bool | None = None, 

3258 delta_window_size: int | None = None, 

3259 ofs_delta: bool = True, 

3260 progress: Callable[..., None] | None = None, 

3261) -> tuple[int, Iterator[UnpackedObject]]: 

3262 """Create pack data from objects. 

3263 

3264 Args: 

3265 objects: Pack objects 

3266 deltify: Whether to deltify pack objects 

3267 delta_window_size: Delta window size 

3268 ofs_delta: Whether to use offset deltas 

3269 progress: Optional progress reporting callback 

3270 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

3271 """ 

3272 count = len(objects) 

3273 if deltify is None: 

3274 # PERFORMANCE/TODO(jelmer): This should be enabled but the python 

3275 # implementation is *much* too slow at the moment. 

3276 # Maybe consider enabling it just if the rust extension is available? 

3277 deltify = False 

3278 if deltify: 

3279 return ( 

3280 count, 

3281 deltify_pack_objects( 

3282 iter(objects), # type: ignore 

3283 window_size=delta_window_size, 

3284 progress=progress, 

3285 ), 

3286 ) 

3287 else: 

3288 

3289 def iter_without_path() -> Iterator[UnpackedObject]: 

3290 for o in objects: 

3291 if isinstance(o, tuple): 

3292 yield full_unpacked_object(o[0]) 

3293 else: 

3294 yield full_unpacked_object(o) 

3295 

3296 return (count, iter_without_path()) 

3297 

3298 

3299def generate_unpacked_objects( 

3300 container: PackedObjectContainer, 

3301 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

3302 delta_window_size: int | None = None, 

3303 deltify: bool | None = None, 

3304 reuse_deltas: bool = True, 

3305 ofs_delta: bool = True, 

3306 other_haves: set[ObjectID] | None = None, 

3307 progress: Callable[..., None] | None = None, 

3308) -> Iterator[UnpackedObject]: 

3309 """Create pack data from objects. 

3310 

3311 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

3312 """ 

3313 todo = dict(object_ids) 

3314 if reuse_deltas: 

3315 for unpack in find_reusable_deltas( 

3316 container, set(todo), other_haves=other_haves, progress=progress 

3317 ): 

3318 del todo[sha_to_hex(RawObjectID(unpack.sha()))] 

3319 yield unpack 

3320 if deltify is None: 

3321 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

3322 # slow at the moment. 

3323 deltify = False 

3324 if deltify: 

3325 objects_to_delta = container.iterobjects_subset( 

3326 todo.keys(), allow_missing=False 

3327 ) 

3328 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta) 

3329 yield from deltas_from_sorted_objects( 

3330 sorted_objs, 

3331 window_size=delta_window_size, 

3332 progress=progress, 

3333 ) 

3334 else: 

3335 for oid in todo: 

3336 yield full_unpacked_object(container[oid]) 

3337 

3338 

3339def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

3340 """Create an UnpackedObject from a ShaFile. 

3341 

3342 Args: 

3343 o: ShaFile object to convert 

3344 

3345 Returns: 

3346 UnpackedObject with full object data 

3347 """ 

3348 return UnpackedObject( 

3349 o.type_num, 

3350 delta_base=None, 

3351 crc32=None, 

3352 decomp_chunks=o.as_raw_chunks(), 

3353 sha=o.sha().digest(), 

3354 ) 

3355 

3356 

3357def write_pack_from_container( 

3358 write: Callable[[bytes], None] 

3359 | Callable[[bytes | bytearray | memoryview], int] 

3360 | IO[bytes], 

3361 container: PackedObjectContainer, 

3362 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

3363 object_format: "ObjectFormat", 

3364 *, 

3365 delta_window_size: int | None = None, 

3366 deltify: bool | None = None, 

3367 reuse_deltas: bool = True, 

3368 compression_level: int = -1, 

3369 other_haves: set[ObjectID] | None = None, 

3370) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3371 """Write a new pack data file. 

3372 

3373 Args: 

3374 write: write function to use 

3375 container: PackedObjectContainer 

3376 object_ids: Sequence of (object_id, hint) tuples to write 

3377 object_format: Object format (hash algorithm) to use 

3378 delta_window_size: Sliding window size for searching for deltas; 

3379 Set to None for default window size. 

3380 deltify: Whether to deltify objects 

3381 reuse_deltas: Whether to reuse existing deltas 

3382 compression_level: the zlib compression level to use 

3383 other_haves: Set of additional object IDs the receiver has 

3384 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3385 """ 

3386 pack_contents_count = len(object_ids) 

3387 pack_contents = generate_unpacked_objects( 

3388 container, 

3389 object_ids, 

3390 delta_window_size=delta_window_size, 

3391 deltify=deltify, 

3392 reuse_deltas=reuse_deltas, 

3393 other_haves=other_haves, 

3394 ) 

3395 

3396 return write_pack_data( 

3397 write, 

3398 pack_contents, 

3399 num_records=pack_contents_count, 

3400 compression_level=compression_level, 

3401 object_format=object_format, 

3402 ) 

3403 

3404 

3405def write_pack_objects( 

3406 write: Callable[[bytes], None] | IO[bytes], 

3407 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

3408 object_format: "ObjectFormat", 

3409 *, 

3410 delta_window_size: int | None = None, 

3411 deltify: bool | None = None, 

3412 compression_level: int = -1, 

3413) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3414 """Write a new pack data file. 

3415 

3416 Args: 

3417 write: write function to use 

3418 objects: Sequence of (object, path) tuples to write 

3419 object_format: Object format (hash algorithm) to use 

3420 delta_window_size: Sliding window size for searching for deltas; 

3421 Set to None for default window size. 

3422 deltify: Whether to deltify objects 

3423 compression_level: the zlib compression level to use 

3424 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3425 """ 

3426 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

3427 

3428 return write_pack_data( 

3429 write, 

3430 pack_contents, 

3431 num_records=pack_contents_count, 

3432 compression_level=compression_level, 

3433 object_format=object_format, 

3434 ) 

3435 

3436 

3437class PackChunkGenerator: 

3438 """Generator for pack data chunks.""" 

3439 

3440 def __init__( 

3441 self, 

3442 object_format: "ObjectFormat", 

3443 num_records: int | None = None, 

3444 records: Iterator[UnpackedObject] | None = None, 

3445 progress: Callable[..., None] | None = None, 

3446 compression_level: int = -1, 

3447 reuse_compressed: bool = True, 

3448 ) -> None: 

3449 """Initialize PackChunkGenerator. 

3450 

3451 Args: 

3452 num_records: Expected number of records 

3453 records: Iterator of pack records 

3454 progress: Optional progress callback 

3455 compression_level: Compression level (-1 for default) 

3456 reuse_compressed: Whether to reuse compressed chunks 

3457 object_format: Object format (hash algorithm) to use 

3458 """ 

3459 self.object_format = object_format 

3460 self.cs = object_format.new_hash() 

3461 self.entries: dict[bytes, tuple[int, int]] = {} 

3462 if records is None: 

3463 records = iter([]) # Empty iterator if None 

3464 self._it = self._pack_data_chunks( 

3465 records=records, 

3466 num_records=num_records, 

3467 progress=progress, 

3468 compression_level=compression_level, 

3469 reuse_compressed=reuse_compressed, 

3470 ) 

3471 

3472 def sha1digest(self) -> bytes: 

3473 """Return the SHA1 digest of the pack data.""" 

3474 return self.cs.digest() 

3475 

3476 def __iter__(self) -> Iterator[bytes]: 

3477 """Iterate over pack data chunks.""" 

3478 return self._it 

3479 

3480 def _pack_data_chunks( 

3481 self, 

3482 records: Iterator[UnpackedObject], 

3483 *, 

3484 num_records: int | None = None, 

3485 progress: Callable[..., None] | None = None, 

3486 compression_level: int = -1, 

3487 reuse_compressed: bool = True, 

3488 ) -> Iterator[bytes]: 

3489 """Iterate pack data file chunks. 

3490 

3491 Args: 

3492 records: Iterator over UnpackedObject 

3493 num_records: Number of records (defaults to len(records) if not specified) 

3494 progress: Function to report progress to 

3495 compression_level: the zlib compression level 

3496 reuse_compressed: Whether to reuse compressed chunks 

3497 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3498 """ 

3499 # Write the pack 

3500 if num_records is None: 

3501 num_records = len(records) # type: ignore 

3502 offset = 0 

3503 for chunk in pack_header_chunks(num_records): 

3504 yield chunk 

3505 self.cs.update(chunk) 

3506 offset += len(chunk) 

3507 actual_num_records = 0 

3508 for i, unpacked in enumerate(records): 

3509 type_num = unpacked.pack_type_num 

3510 if progress is not None and i % 1000 == 0: 

3511 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

3512 raw: list[bytes] | tuple[int, list[bytes]] | tuple[bytes, list[bytes]] 

3513 if unpacked.delta_base is not None: 

3514 assert isinstance(unpacked.delta_base, bytes), ( 

3515 f"Expected bytes, got {type(unpacked.delta_base)}" 

3516 ) 

3517 try: 

3518 base_offset, _base_crc32 = self.entries[unpacked.delta_base] 

3519 except KeyError: 

3520 type_num = REF_DELTA 

3521 assert isinstance(unpacked.delta_base, bytes) 

3522 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

3523 else: 

3524 type_num = OFS_DELTA 

3525 raw = (offset - base_offset, unpacked.decomp_chunks) 

3526 else: 

3527 raw = unpacked.decomp_chunks 

3528 chunks: list[bytes] | Iterator[bytes] 

3529 if unpacked.comp_chunks is not None and reuse_compressed: 

3530 chunks = unpacked.comp_chunks 

3531 else: 

3532 chunks = pack_object_chunks( 

3533 type_num, 

3534 raw, 

3535 compression_level=compression_level, 

3536 object_format=self.object_format, 

3537 ) 

3538 crc32 = 0 

3539 object_size = 0 

3540 for chunk in chunks: 

3541 yield chunk 

3542 crc32 = binascii.crc32(chunk, crc32) 

3543 self.cs.update(chunk) 

3544 object_size += len(chunk) 

3545 actual_num_records += 1 

3546 self.entries[unpacked.sha()] = (offset, crc32) 

3547 offset += object_size 

3548 if actual_num_records != num_records: 

3549 raise AssertionError( 

3550 f"actual records written differs: {actual_num_records} != {num_records}" 

3551 ) 

3552 

3553 yield self.cs.digest() 

3554 

3555 

3556def write_pack_data( 

3557 write: Callable[[bytes], None] 

3558 | Callable[[bytes | bytearray | memoryview], int] 

3559 | IO[bytes], 

3560 records: Iterator[UnpackedObject], 

3561 object_format: "ObjectFormat", 

3562 *, 

3563 num_records: int | None = None, 

3564 progress: Callable[..., None] | None = None, 

3565 compression_level: int = -1, 

3566) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3567 """Write a new pack data file. 

3568 

3569 Args: 

3570 write: Write function to use 

3571 num_records: Number of records (defaults to len(records) if None) 

3572 records: Iterator over type_num, object_id, delta_base, raw 

3573 object_format: Object format (hash algorithm) to use 

3574 progress: Function to report progress to 

3575 compression_level: the zlib compression level 

3576 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3577 """ 

3578 chunk_generator = PackChunkGenerator( 

3579 num_records=num_records, 

3580 records=records, 

3581 progress=progress, 

3582 compression_level=compression_level, 

3583 object_format=object_format, 

3584 ) 

3585 for chunk in chunk_generator: 

3586 if callable(write): 

3587 write(chunk) 

3588 else: 

3589 write.write(chunk) 

3590 return chunk_generator.entries, chunk_generator.sha1digest() 

3591 

3592 

3593def write_pack_index_v1( 

3594 f: IO[bytes], 

3595 entries: Iterable[tuple[bytes, int, int | None]], 

3596 pack_checksum: bytes, 

3597) -> bytes: 

3598 """Write a new pack index file. 

3599 

3600 Args: 

3601 f: A file-like object to write to 

3602 entries: List of tuples with object name (sha), offset_in_pack, 

3603 and crc32_checksum. 

3604 pack_checksum: Checksum of the pack file. 

3605 Returns: The SHA of the written index file 

3606 """ 

3607 f = SHA1Writer(f) 

3608 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3609 for name, _offset, _entry_checksum in entries: 

3610 fan_out_table[ord(name[:1])] += 1 

3611 # Fan-out table 

3612 for i in range(0x100): 

3613 f.write(struct.pack(">L", fan_out_table[i])) 

3614 fan_out_table[i + 1] += fan_out_table[i] 

3615 for name, offset, _entry_checksum in entries: 

3616 if len(name) != 20: 

3617 raise TypeError("pack index v1 only supports SHA-1 names") 

3618 if not (offset <= 0xFFFFFFFF): 

3619 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3620 f.write(struct.pack(">L20s", offset, name)) 

3621 assert len(pack_checksum) == 20 

3622 f.write(pack_checksum) 

3623 return f.write_sha() 

3624 

3625 

3626def _delta_encode_size(size: int) -> bytes: 

3627 ret = bytearray() 

3628 c = size & 0x7F 

3629 size >>= 7 

3630 while size: 

3631 ret.append(c | 0x80) 

3632 c = size & 0x7F 

3633 size >>= 7 

3634 ret.append(c) 

3635 return bytes(ret) 

3636 

3637 

3638# The length of delta compression copy operations in version 2 packs is limited 

3639# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3640# 24-bit lengths in copy operations, but we always make version 2 packs. 

3641_MAX_COPY_LEN = 0xFFFF 

3642 

3643 

3644def _encode_copy_operation(start: int, length: int) -> bytes: 

3645 scratch = bytearray([0x80]) 

3646 for i in range(4): 

3647 if start & 0xFF << i * 8: 

3648 scratch.append((start >> i * 8) & 0xFF) 

3649 scratch[0] |= 1 << i 

3650 for i in range(2): 

3651 if length & 0xFF << i * 8: 

3652 scratch.append((length >> i * 8) & 0xFF) 

3653 scratch[0] |= 1 << (4 + i) 

3654 return bytes(scratch) 

3655 

3656 

3657def _create_delta_py(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3658 """Use python difflib to work out how to transform base_buf to target_buf. 

3659 

3660 Args: 

3661 base_buf: Base buffer 

3662 target_buf: Target buffer 

3663 """ 

3664 if isinstance(base_buf, list): 

3665 base_buf = b"".join(base_buf) 

3666 if isinstance(target_buf, list): 

3667 target_buf = b"".join(target_buf) 

3668 assert isinstance(base_buf, bytes) 

3669 assert isinstance(target_buf, bytes) 

3670 # write delta header 

3671 yield _delta_encode_size(len(base_buf)) 

3672 yield _delta_encode_size(len(target_buf)) 

3673 # write out delta opcodes 

3674 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3675 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3676 # Git patch opcodes don't care about deletes! 

3677 # if opcode == 'replace' or opcode == 'delete': 

3678 # pass 

3679 if opcode == "equal": 

3680 # If they are equal, unpacker will use data from base_buf 

3681 # Write out an opcode that says what range to use 

3682 copy_start = i1 

3683 copy_len = i2 - i1 

3684 while copy_len > 0: 

3685 to_copy = min(copy_len, _MAX_COPY_LEN) 

3686 yield _encode_copy_operation(copy_start, to_copy) 

3687 copy_start += to_copy 

3688 copy_len -= to_copy 

3689 if opcode == "replace" or opcode == "insert": 

3690 # If we are replacing a range or adding one, then we just 

3691 # output it to the stream (prefixed by its size) 

3692 s = j2 - j1 

3693 o = j1 

3694 while s > 127: 

3695 yield bytes([127]) 

3696 yield bytes(memoryview(target_buf)[o : o + 127]) 

3697 s -= 127 

3698 o += 127 

3699 yield bytes([s]) 

3700 yield bytes(memoryview(target_buf)[o : o + s]) 

3701 

3702 

3703# Default to pure Python implementation 

3704create_delta = _create_delta_py 

3705 

3706 

3707def apply_delta( 

3708 src_buf: bytes | list[bytes], delta: bytes | list[bytes] 

3709) -> list[bytes]: 

3710 """Based on the similar function in git's patch-delta.c. 

3711 

3712 Args: 

3713 src_buf: Source buffer 

3714 delta: Delta instructions 

3715 """ 

3716 if not isinstance(src_buf, bytes): 

3717 src_buf = b"".join(src_buf) 

3718 if not isinstance(delta, bytes): 

3719 delta = b"".join(delta) 

3720 out = [] 

3721 index = 0 

3722 delta_length = len(delta) 

3723 

3724 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3725 size = 0 

3726 i = 0 

3727 while delta: 

3728 cmd = ord(delta[index : index + 1]) 

3729 index += 1 

3730 size |= (cmd & ~0x80) << i 

3731 i += 7 

3732 if not cmd & 0x80: 

3733 break 

3734 return size, index 

3735 

3736 src_size, index = get_delta_header_size(delta, index) 

3737 dest_size, index = get_delta_header_size(delta, index) 

3738 if src_size != len(src_buf): 

3739 raise ApplyDeltaError( 

3740 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3741 ) 

3742 while index < delta_length: 

3743 cmd = ord(delta[index : index + 1]) 

3744 index += 1 

3745 if cmd & 0x80: 

3746 cp_off = 0 

3747 for i in range(4): 

3748 if cmd & (1 << i): 

3749 x = ord(delta[index : index + 1]) 

3750 index += 1 

3751 cp_off |= x << (i * 8) 

3752 cp_size = 0 

3753 # Version 3 packs can contain copy sizes larger than 64K. 

3754 for i in range(3): 

3755 if cmd & (1 << (4 + i)): 

3756 x = ord(delta[index : index + 1]) 

3757 index += 1 

3758 cp_size |= x << (i * 8) 

3759 if cp_size == 0: 

3760 cp_size = 0x10000 

3761 if ( 

3762 cp_off + cp_size < cp_size 

3763 or cp_off + cp_size > src_size 

3764 or cp_size > dest_size 

3765 ): 

3766 break 

3767 out.append(src_buf[cp_off : cp_off + cp_size]) 

3768 elif cmd != 0: 

3769 out.append(delta[index : index + cmd]) 

3770 index += cmd 

3771 else: 

3772 raise ApplyDeltaError("Invalid opcode 0") 

3773 

3774 if index != delta_length: 

3775 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3776 

3777 if dest_size != chunks_length(out): 

3778 raise ApplyDeltaError("dest size incorrect") 

3779 

3780 return out 

3781 

3782 

3783def write_pack_index_v2( 

3784 f: IO[bytes], 

3785 entries: Iterable[tuple[bytes, int, int | None]], 

3786 pack_checksum: bytes, 

3787) -> bytes: 

3788 """Write a new pack index file. 

3789 

3790 Args: 

3791 f: File-like object to write to 

3792 entries: List of tuples with object name (sha), offset_in_pack, and 

3793 crc32_checksum. 

3794 pack_checksum: Checksum of the pack file. 

3795 Returns: The checksum of the index file written 

3796 """ 

3797 # Determine hash algorithm from pack_checksum length 

3798 if len(pack_checksum) == 20: 

3799 hash_func = sha1 

3800 elif len(pack_checksum) == 32: 

3801 hash_func = sha256 

3802 else: 

3803 raise ValueError(f"Unsupported pack checksum length: {len(pack_checksum)}") 

3804 

3805 f_writer = HashWriter(f, hash_func) 

3806 f_writer.write(b"\377tOc") # Magic! 

3807 f_writer.write(struct.pack(">L", 2)) 

3808 

3809 # Convert to list to allow multiple iterations 

3810 entries_list = list(entries) 

3811 

3812 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3813 for name, offset, entry_checksum in entries_list: 

3814 fan_out_table[ord(name[:1])] += 1 

3815 

3816 if entries_list: 

3817 hash_size = len(entries_list[0][0]) 

3818 else: 

3819 hash_size = len(pack_checksum) # Use pack_checksum length as hash size 

3820 

3821 # Fan-out table 

3822 largetable: list[int] = [] 

3823 for i in range(0x100): 

3824 f_writer.write(struct.pack(b">L", fan_out_table[i])) 

3825 fan_out_table[i + 1] += fan_out_table[i] 

3826 for name, offset, entry_checksum in entries_list: 

3827 if len(name) != hash_size: 

3828 raise TypeError( 

3829 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3830 ) 

3831 f_writer.write(name) 

3832 for name, offset, entry_checksum in entries_list: 

3833 f_writer.write(struct.pack(b">L", entry_checksum)) 

3834 for name, offset, entry_checksum in entries_list: 

3835 if offset < 2**31: 

3836 f_writer.write(struct.pack(b">L", offset)) 

3837 else: 

3838 f_writer.write(struct.pack(b">L", 2**31 + len(largetable))) 

3839 largetable.append(offset) 

3840 for offset in largetable: 

3841 f_writer.write(struct.pack(b">Q", offset)) 

3842 f_writer.write(pack_checksum) 

3843 return f_writer.write_hash() 

3844 

3845 

3846def write_pack_index_v3( 

3847 f: IO[bytes], 

3848 entries: Iterable[tuple[bytes, int, int | None]], 

3849 pack_checksum: bytes, 

3850 hash_format: int = 1, 

3851) -> bytes: 

3852 """Write a new pack index file in v3 format. 

3853 

3854 Args: 

3855 f: File-like object to write to 

3856 entries: List of tuples with object name (sha), offset_in_pack, and 

3857 crc32_checksum. 

3858 pack_checksum: Checksum of the pack file. 

3859 hash_format: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3860 Returns: The SHA of the index file written 

3861 """ 

3862 if hash_format == 1: 

3863 hash_size = 20 # SHA-1 

3864 writer_cls = SHA1Writer 

3865 elif hash_format == 2: 

3866 hash_size = 32 # SHA-256 

3867 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3868 raise NotImplementedError("SHA-256 support not yet implemented") 

3869 else: 

3870 raise ValueError(f"Unknown hash algorithm {hash_format}") 

3871 

3872 # Convert entries to list to allow multiple iterations 

3873 entries_list = list(entries) 

3874 

3875 # Calculate shortest unambiguous prefix length for object names 

3876 # For now, use full hash size (this could be optimized) 

3877 shortened_oid_len = hash_size 

3878 

3879 f = writer_cls(f) 

3880 f.write(b"\377tOc") # Magic! 

3881 f.write(struct.pack(">L", 3)) # Version 3 

3882 f.write(struct.pack(">L", hash_format)) # Hash algorithm 

3883 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3884 

3885 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3886 for name, offset, entry_checksum in entries_list: 

3887 if len(name) != hash_size: 

3888 raise ValueError( 

3889 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3890 ) 

3891 fan_out_table[ord(name[:1])] += 1 

3892 

3893 # Fan-out table 

3894 largetable: list[int] = [] 

3895 for i in range(0x100): 

3896 f.write(struct.pack(b">L", fan_out_table[i])) 

3897 fan_out_table[i + 1] += fan_out_table[i] 

3898 

3899 # Object names table 

3900 for name, offset, entry_checksum in entries_list: 

3901 f.write(name) 

3902 

3903 # CRC32 checksums table 

3904 for name, offset, entry_checksum in entries_list: 

3905 f.write(struct.pack(b">L", entry_checksum)) 

3906 

3907 # Offset table 

3908 for name, offset, entry_checksum in entries_list: 

3909 if offset < 2**31: 

3910 f.write(struct.pack(b">L", offset)) 

3911 else: 

3912 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3913 largetable.append(offset) 

3914 

3915 # Large offset table 

3916 for offset in largetable: 

3917 f.write(struct.pack(b">Q", offset)) 

3918 

3919 assert len(pack_checksum) == hash_size, ( 

3920 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3921 ) 

3922 f.write(pack_checksum) 

3923 return f.write_sha() 

3924 

3925 

3926def write_pack_index( 

3927 f: IO[bytes], 

3928 entries: Iterable[tuple[bytes, int, int | None]], 

3929 pack_checksum: bytes, 

3930 progress: Callable[..., None] | None = None, 

3931 version: int | None = None, 

3932) -> bytes: 

3933 """Write a pack index file. 

3934 

3935 Args: 

3936 f: File-like object to write to. 

3937 entries: List of (checksum, offset, crc32) tuples 

3938 pack_checksum: Checksum of the pack file. 

3939 progress: Progress function (not currently used) 

3940 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3941 

3942 Returns: 

3943 SHA of the written index file 

3944 

3945 Raises: 

3946 ValueError: If an unsupported version is specified 

3947 """ 

3948 if version is None: 

3949 version = DEFAULT_PACK_INDEX_VERSION 

3950 

3951 if version == 1: 

3952 return write_pack_index_v1(f, entries, pack_checksum) 

3953 elif version == 2: 

3954 return write_pack_index_v2(f, entries, pack_checksum) 

3955 elif version == 3: 

3956 return write_pack_index_v3(f, entries, pack_checksum) 

3957 else: 

3958 raise ValueError(f"Unsupported pack index version: {version}") 

3959 

3960 

3961class Pack: 

3962 """A Git pack object.""" 

3963 

3964 _data_load: Callable[[], PackData] | None 

3965 _idx_load: Callable[[], PackIndex] | None 

3966 

3967 _data: PackData | None 

3968 _idx: PackIndex | None 

3969 _bitmap: "PackBitmap | None" 

3970 

3971 def __init__( 

3972 self, 

3973 basename: str, 

3974 *, 

3975 object_format: ObjectFormat, 

3976 resolve_ext_ref: ResolveExtRefFn | None = None, 

3977 delta_window_size: int | None = None, 

3978 window_memory: int | None = None, 

3979 delta_cache_size: int | None = None, 

3980 depth: int | None = None, 

3981 threads: int | None = None, 

3982 big_file_threshold: int | None = None, 

3983 ) -> None: 

3984 """Initialize a Pack object. 

3985 

3986 Args: 

3987 basename: Base path for pack files (without .pack/.idx extension) 

3988 object_format: Hash algorithm used by the repository 

3989 resolve_ext_ref: Optional function to resolve external references 

3990 delta_window_size: Size of the delta compression window 

3991 window_memory: Memory limit for delta compression window 

3992 delta_cache_size: Size of the delta cache 

3993 depth: Maximum depth for delta chains 

3994 threads: Number of threads to use for operations 

3995 big_file_threshold: Size threshold for big file handling 

3996 """ 

3997 self._basename = basename 

3998 self.object_format = object_format 

3999 self._data = None 

4000 self._idx = None 

4001 self._bitmap = None 

4002 self._idx_path = self._basename + ".idx" 

4003 self._data_path = self._basename + ".pack" 

4004 self._bitmap_path = self._basename + ".bitmap" 

4005 self.delta_window_size = delta_window_size 

4006 self.window_memory = window_memory 

4007 self.delta_cache_size = delta_cache_size 

4008 self.depth = depth 

4009 self.threads = threads 

4010 self.big_file_threshold = big_file_threshold 

4011 self._idx_load = lambda: load_pack_index(self._idx_path, object_format) 

4012 self._data_load = lambda: PackData( 

4013 self._data_path, 

4014 delta_window_size=delta_window_size, 

4015 window_memory=window_memory, 

4016 delta_cache_size=delta_cache_size, 

4017 depth=depth, 

4018 threads=threads, 

4019 big_file_threshold=big_file_threshold, 

4020 object_format=object_format, 

4021 ) 

4022 self.resolve_ext_ref = resolve_ext_ref 

4023 

4024 @classmethod 

4025 def from_lazy_objects( 

4026 cls, 

4027 data_fn: Callable[[], PackData], 

4028 idx_fn: Callable[[], PackIndex], 

4029 ) -> "Pack": 

4030 """Create a new pack object from callables to load pack data and index objects.""" 

4031 # Load index to get object format 

4032 idx = idx_fn() 

4033 ret = cls("", object_format=idx.object_format) 

4034 ret._data_load = data_fn 

4035 ret._idx = idx 

4036 ret._idx_load = None 

4037 return ret 

4038 

4039 @classmethod 

4040 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

4041 """Create a new pack object from pack data and index objects.""" 

4042 ret = cls("", object_format=idx.object_format) 

4043 ret._data = data 

4044 ret._data_load = None 

4045 ret._idx = idx 

4046 ret._idx_load = None 

4047 ret.check_length_and_checksum() 

4048 return ret 

4049 

4050 def name(self) -> bytes: 

4051 """The SHA over the SHAs of the objects in this pack.""" 

4052 return self.index.objects_sha1() 

4053 

4054 @property 

4055 def data(self) -> PackData: 

4056 """The pack data object being used.""" 

4057 if self._data is None: 

4058 assert self._data_load 

4059 self._data = self._data_load() 

4060 self.check_length_and_checksum() 

4061 return self._data 

4062 

4063 @property 

4064 def index(self) -> PackIndex: 

4065 """The index being used. 

4066 

4067 Note: This may be an in-memory index 

4068 """ 

4069 if self._idx is None: 

4070 assert self._idx_load 

4071 self._idx = self._idx_load() 

4072 return self._idx 

4073 

4074 @property 

4075 def bitmap(self) -> "PackBitmap | None": 

4076 """The bitmap being used, if available. 

4077 

4078 Returns: 

4079 PackBitmap instance or None if no bitmap exists 

4080 

4081 Raises: 

4082 ValueError: If bitmap file is invalid or corrupt 

4083 """ 

4084 if self._bitmap is None: 

4085 from .bitmap import read_bitmap 

4086 

4087 self._bitmap = read_bitmap(self._bitmap_path, pack_index=self.index) 

4088 return self._bitmap 

4089 

4090 def ensure_bitmap( 

4091 self, 

4092 object_store: "BaseObjectStore", 

4093 refs: dict["Ref", "ObjectID"], 

4094 commit_interval: int | None = None, 

4095 progress: Callable[[str], None] | None = None, 

4096 ) -> "PackBitmap": 

4097 """Ensure a bitmap exists for this pack, generating one if needed. 

4098 

4099 Args: 

4100 object_store: Object store to read objects from 

4101 refs: Dictionary of ref names to commit SHAs 

4102 commit_interval: Include every Nth commit in bitmap index 

4103 progress: Optional progress reporting callback 

4104 

4105 Returns: 

4106 PackBitmap instance (either existing or newly generated) 

4107 """ 

4108 from .bitmap import generate_bitmap, write_bitmap 

4109 

4110 # Check if bitmap already exists 

4111 try: 

4112 existing = self.bitmap 

4113 if existing is not None: 

4114 return existing 

4115 except FileNotFoundError: 

4116 pass # No bitmap, we'll generate one 

4117 

4118 # Generate new bitmap 

4119 if progress: 

4120 progress(f"Generating bitmap for {self.name().decode('utf-8')}...\n") 

4121 

4122 pack_bitmap = generate_bitmap( 

4123 self.index, 

4124 object_store, 

4125 refs, 

4126 self.get_stored_checksum(), 

4127 commit_interval=commit_interval, 

4128 progress=progress, 

4129 ) 

4130 

4131 # Write bitmap file 

4132 write_bitmap(self._bitmap_path, pack_bitmap) 

4133 

4134 if progress: 

4135 progress(f"Wrote {self._bitmap_path}\n") 

4136 

4137 # Update cached bitmap 

4138 self._bitmap = pack_bitmap 

4139 

4140 return pack_bitmap 

4141 

4142 def close(self) -> None: 

4143 """Close the pack file and index.""" 

4144 if self._data is not None: 

4145 self._data.close() 

4146 self._data = None 

4147 if self._idx is not None: 

4148 self._idx.close() 

4149 self._idx = None 

4150 

4151 def __del__(self) -> None: 

4152 """Ensure pack file is closed when Pack is garbage collected.""" 

4153 if self._data is not None or self._idx is not None: 

4154 import warnings 

4155 

4156 warnings.warn( 

4157 f"unclosed Pack {self!r}", ResourceWarning, stacklevel=2, source=self 

4158 ) 

4159 try: 

4160 self.close() 

4161 except Exception: 

4162 # Ignore errors during cleanup 

4163 pass 

4164 

4165 def __enter__(self) -> "Pack": 

4166 """Enter context manager.""" 

4167 return self 

4168 

4169 def __exit__( 

4170 self, 

4171 exc_type: type | None, 

4172 exc_val: BaseException | None, 

4173 exc_tb: TracebackType | None, 

4174 ) -> None: 

4175 """Exit context manager.""" 

4176 self.close() 

4177 

4178 def __eq__(self, other: object) -> bool: 

4179 """Check equality with another pack.""" 

4180 if not isinstance(other, Pack): 

4181 return False 

4182 return self.index == other.index 

4183 

4184 def __len__(self) -> int: 

4185 """Number of entries in this pack.""" 

4186 return len(self.index) 

4187 

4188 def __repr__(self) -> str: 

4189 """Return string representation of this pack.""" 

4190 return f"{self.__class__.__name__}({self._basename!r})" 

4191 

4192 def __iter__(self) -> Iterator[ObjectID]: 

4193 """Iterate over all the sha1s of the objects in this pack.""" 

4194 return iter(self.index) 

4195 

4196 def check_length_and_checksum(self) -> None: 

4197 """Sanity check the length and checksum of the pack index and data.""" 

4198 assert len(self.index) == len(self.data), ( 

4199 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

4200 ) 

4201 idx_stored_checksum = self.index.get_pack_checksum() 

4202 data_stored_checksum = self.data.get_stored_checksum() 

4203 if ( 

4204 idx_stored_checksum is not None 

4205 and idx_stored_checksum != data_stored_checksum 

4206 ): 

4207 raise ChecksumMismatch( 

4208 sha_to_hex(RawObjectID(idx_stored_checksum)), 

4209 sha_to_hex(RawObjectID(data_stored_checksum)), 

4210 ) 

4211 

4212 def check(self) -> None: 

4213 """Check the integrity of this pack. 

4214 

4215 Raises: 

4216 ChecksumMismatch: if a checksum for the index or data is wrong 

4217 """ 

4218 self.index.check() 

4219 self.data.check() 

4220 for obj in self.iterobjects(): 

4221 obj.check() 

4222 # TODO: object connectivity checks 

4223 

4224 def get_stored_checksum(self) -> bytes: 

4225 """Return the stored checksum of the pack data.""" 

4226 return self.data.get_stored_checksum() 

4227 

4228 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

4229 """Return pack tuples for all objects in pack.""" 

4230 return [(o, None) for o in self.iterobjects()] 

4231 

4232 def __contains__(self, sha1: ObjectID | RawObjectID) -> bool: 

4233 """Check whether this pack contains a particular SHA1.""" 

4234 try: 

4235 self.index.object_offset(sha1) 

4236 return True 

4237 except KeyError: 

4238 return False 

4239 

4240 def get_raw(self, sha1: RawObjectID | ObjectID) -> tuple[int, bytes]: 

4241 """Get raw object data by SHA1.""" 

4242 offset = self.index.object_offset(sha1) 

4243 obj_type, obj = self.data.get_object_at(offset) 

4244 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

4245 return type_num, b"".join(chunks) # type: ignore[arg-type] 

4246 

4247 def __getitem__(self, sha1: "ObjectID | RawObjectID") -> ShaFile: 

4248 """Retrieve the specified SHA1.""" 

4249 type, uncomp = self.get_raw(sha1) 

4250 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

4251 

4252 def iterobjects(self) -> Iterator[ShaFile]: 

4253 """Iterate over the objects in this pack.""" 

4254 return iter( 

4255 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

4256 ) 

4257 

4258 def iterobjects_subset( 

4259 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

4260 ) -> Iterator[ShaFile]: 

4261 """Iterate over a subset of objects in this pack.""" 

4262 return ( 

4263 uo 

4264 for uo in PackInflater.for_pack_subset( 

4265 self, 

4266 shas, 

4267 allow_missing=allow_missing, 

4268 resolve_ext_ref=self.resolve_ext_ref, 

4269 ) 

4270 if uo.id in shas 

4271 ) 

4272 

4273 def iter_unpacked_subset( 

4274 self, 

4275 shas: Iterable[ObjectID | RawObjectID], 

4276 *, 

4277 include_comp: bool = False, 

4278 allow_missing: bool = False, 

4279 convert_ofs_delta: bool = False, 

4280 ) -> Iterator[UnpackedObject]: 

4281 """Iterate over unpacked objects in subset.""" 

4282 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

4283 ofs: dict[int, bytes] = {} 

4284 todo: set[ObjectID | RawObjectID] = set(shas) 

4285 for unpacked in self.iter_unpacked(include_comp=include_comp): 

4286 sha = unpacked.sha() 

4287 if unpacked.offset is not None: 

4288 ofs[unpacked.offset] = sha 

4289 hexsha = sha_to_hex(RawObjectID(sha)) 

4290 if hexsha in todo: 

4291 if unpacked.pack_type_num == OFS_DELTA: 

4292 assert isinstance(unpacked.delta_base, int) 

4293 assert unpacked.offset is not None 

4294 base_offset = unpacked.offset - unpacked.delta_base 

4295 try: 

4296 unpacked.delta_base = ofs[base_offset] 

4297 except KeyError: 

4298 ofs_pending[base_offset].append(unpacked) 

4299 continue 

4300 else: 

4301 unpacked.pack_type_num = REF_DELTA 

4302 yield unpacked 

4303 todo.remove(hexsha) 

4304 if unpacked.offset is not None: 

4305 for child in ofs_pending.pop(unpacked.offset, []): 

4306 child.pack_type_num = REF_DELTA 

4307 child.delta_base = sha 

4308 yield child 

4309 assert not ofs_pending 

4310 if not allow_missing and todo: 

4311 raise UnresolvedDeltas(list(todo)) 

4312 

4313 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

4314 """Iterate over all unpacked objects in this pack.""" 

4315 ofs_to_entries = { 

4316 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

4317 } 

4318 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

4319 assert unpacked.offset is not None 

4320 (sha, crc32) = ofs_to_entries[unpacked.offset] 

4321 unpacked._sha = sha 

4322 unpacked.crc32 = crc32 

4323 yield unpacked 

4324 

4325 def keep(self, msg: bytes | None = None) -> str: 

4326 """Add a .keep file for the pack, preventing git from garbage collecting it. 

4327 

4328 Args: 

4329 msg: A message written inside the .keep file; can be used later 

4330 to determine whether or not a .keep file is obsolete. 

4331 Returns: The path of the .keep file, as a string. 

4332 """ 

4333 keepfile_name = f"{self._basename}.keep" 

4334 with GitFile(keepfile_name, "wb") as keepfile: 

4335 if msg: 

4336 keepfile.write(msg) 

4337 keepfile.write(b"\n") 

4338 return keepfile_name 

4339 

4340 def get_ref( 

4341 self, sha: RawObjectID | ObjectID 

4342 ) -> tuple[int | None, int, OldUnpackedObject]: 

4343 """Get the object for a ref SHA, only looking in this pack.""" 

4344 # TODO: cache these results 

4345 try: 

4346 offset = self.index.object_offset(sha) 

4347 except KeyError: 

4348 offset = None 

4349 if offset: 

4350 type, obj = self.data.get_object_at(offset) 

4351 elif self.resolve_ext_ref: 

4352 type, obj = self.resolve_ext_ref(sha) 

4353 else: 

4354 raise KeyError(sha) 

4355 return offset, type, obj 

4356 

4357 def resolve_object( 

4358 self, 

4359 offset: int, 

4360 type: int, 

4361 obj: OldUnpackedObject, 

4362 get_ref: Callable[ 

4363 [RawObjectID | ObjectID], tuple[int | None, int, OldUnpackedObject] 

4364 ] 

4365 | None = None, 

4366 ) -> tuple[int, OldUnpackedObject]: 

4367 """Resolve an object, possibly resolving deltas when necessary. 

4368 

4369 Returns: Tuple with object type and contents. 

4370 """ 

4371 # Walk down the delta chain, building a stack of deltas to reach 

4372 # the requested object. 

4373 base_offset: int | None = offset 

4374 base_type = type 

4375 base_obj = obj 

4376 delta_stack = [] 

4377 while base_type in DELTA_TYPES: 

4378 prev_offset = base_offset 

4379 if get_ref is None: 

4380 get_ref = self.get_ref 

4381 if base_type == OFS_DELTA: 

4382 (delta_offset, delta) = base_obj 

4383 # TODO: clean up asserts and replace with nicer error messages 

4384 assert isinstance(delta_offset, int), ( 

4385 f"Expected int, got {delta_offset.__class__}" 

4386 ) 

4387 assert base_offset is not None 

4388 base_offset = base_offset - delta_offset 

4389 base_type, base_obj = self.data.get_object_at(base_offset) 

4390 assert isinstance(base_type, int) 

4391 elif base_type == REF_DELTA: 

4392 (basename, delta) = base_obj 

4393 assert ( 

4394 isinstance(basename, bytes) 

4395 and len(basename) == self.object_format.oid_length 

4396 ) 

4397 base_offset_temp, base_type, base_obj = get_ref(RawObjectID(basename)) 

4398 assert isinstance(base_type, int) 

4399 # base_offset_temp can be None for thin packs (external references) 

4400 base_offset = base_offset_temp 

4401 if base_offset == prev_offset: # object is based on itself 

4402 raise UnresolvedDeltas([basename]) 

4403 delta_stack.append((prev_offset, base_type, delta)) 

4404 

4405 # Now grab the base object (mustn't be a delta) and apply the 

4406 # deltas all the way up the stack. 

4407 chunks = base_obj 

4408 for prev_offset, _delta_type, delta in reversed(delta_stack): 

4409 # Convert chunks to bytes for apply_delta if needed 

4410 if isinstance(chunks, list): 

4411 chunks_bytes = b"".join(chunks) 

4412 elif isinstance(chunks, tuple): 

4413 # For tuple type, second element is the actual data 

4414 _, chunk_data = chunks 

4415 if isinstance(chunk_data, list): 

4416 chunks_bytes = b"".join(chunk_data) 

4417 else: 

4418 chunks_bytes = chunk_data 

4419 else: 

4420 chunks_bytes = chunks 

4421 

4422 # Apply delta and get result as list 

4423 chunks = apply_delta(chunks_bytes, delta) 

4424 

4425 if prev_offset is not None: 

4426 self.data._offset_cache[prev_offset] = base_type, chunks 

4427 return base_type, chunks 

4428 

4429 def entries( 

4430 self, progress: Callable[[int, int], None] | None = None 

4431 ) -> Iterator[PackIndexEntry]: 

4432 """Yield entries summarizing the contents of this pack. 

4433 

4434 Args: 

4435 progress: Progress function, called with current and total 

4436 object count. 

4437 Returns: iterator of tuples with (sha, offset, crc32) 

4438 """ 

4439 return self.data.iterentries( 

4440 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

4441 ) 

4442 

4443 def sorted_entries( 

4444 self, progress: Callable[[int, int], None] | None = None 

4445 ) -> Iterator[PackIndexEntry]: 

4446 """Return entries in this pack, sorted by SHA. 

4447 

4448 Args: 

4449 progress: Progress function, called with current and total 

4450 object count 

4451 Returns: Iterator of tuples with (sha, offset, crc32) 

4452 """ 

4453 return iter( 

4454 self.data.sorted_entries( 

4455 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

4456 ) 

4457 ) 

4458 

4459 def get_unpacked_object( 

4460 self, 

4461 sha: ObjectID | RawObjectID, 

4462 *, 

4463 include_comp: bool = False, 

4464 convert_ofs_delta: bool = True, 

4465 ) -> UnpackedObject: 

4466 """Get the unpacked object for a sha. 

4467 

4468 Args: 

4469 sha: SHA of object to fetch 

4470 include_comp: Whether to include compression data in UnpackedObject 

4471 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

4472 """ 

4473 offset = self.index.object_offset(sha) 

4474 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

4475 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

4476 assert isinstance(unpacked.delta_base, int) 

4477 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

4478 unpacked.pack_type_num = REF_DELTA 

4479 return unpacked 

4480 

4481 

4482def extend_pack( 

4483 f: BinaryIO, 

4484 object_ids: Set["RawObjectID"], 

4485 get_raw: Callable[["RawObjectID | ObjectID"], tuple[int, bytes]], 

4486 object_format: "ObjectFormat", 

4487 *, 

4488 compression_level: int = -1, 

4489 progress: Callable[[bytes], None] | None = None, 

4490) -> tuple[bytes, list[tuple[RawObjectID, int, int]]]: 

4491 """Extend a pack file with more objects. 

4492 

4493 The caller should make sure that object_ids does not contain any objects 

4494 that are already in the pack 

4495 """ 

4496 # Update the header with the new number of objects. 

4497 f.seek(0) 

4498 _version, num_objects = read_pack_header(f.read) 

4499 

4500 if object_ids: 

4501 f.seek(0) 

4502 write_pack_header(f.write, num_objects + len(object_ids)) 

4503 

4504 # Must flush before reading (http://bugs.python.org/issue3207) 

4505 f.flush() 

4506 

4507 # Rescan the rest of the pack, computing the SHA with the new header. 

4508 new_sha = compute_file_sha( 

4509 f, hash_func=object_format.hash_func, end_ofs=-object_format.oid_length 

4510 ) 

4511 

4512 # Must reposition before writing (http://bugs.python.org/issue3207) 

4513 f.seek(0, os.SEEK_CUR) 

4514 

4515 extra_entries = [] 

4516 

4517 # Complete the pack. 

4518 for i, object_id in enumerate(object_ids): 

4519 if progress is not None: 

4520 progress( 

4521 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

4522 ) 

4523 assert len(object_id) == object_format.oid_length 

4524 type_num, data = get_raw(object_id) 

4525 offset = f.tell() 

4526 crc32 = write_pack_object( 

4527 f.write, 

4528 type_num, 

4529 [data], # Convert bytes to list[bytes] 

4530 sha=new_sha, 

4531 compression_level=compression_level, 

4532 object_format=object_format, 

4533 ) 

4534 extra_entries.append((object_id, offset, crc32)) 

4535 pack_sha = new_sha.digest() 

4536 f.write(pack_sha) 

4537 return pack_sha, extra_entries 

4538 

4539 

4540try: 

4541 from dulwich._pack import ( # type: ignore 

4542 apply_delta, 

4543 bisect_find_sha, 

4544 ) 

4545except ImportError: 

4546 pass 

4547 

4548# Try to import the Rust version of create_delta 

4549try: 

4550 from dulwich._pack import create_delta as _create_delta_rs 

4551except ImportError: 

4552 pass 

4553else: 

4554 # Wrap the Rust version to match the Python API (returns bytes instead of Iterator) 

4555 def _create_delta_rs_wrapper(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

4556 """Wrapper for Rust create_delta to match Python API.""" 

4557 yield _create_delta_rs(base_buf, target_buf) 

4558 

4559 create_delta = _create_delta_rs_wrapper