Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1827 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36__all__ = [ 

37 "DEFAULT_PACK_DELTA_WINDOW_SIZE", 

38 "DEFAULT_PACK_INDEX_VERSION", 

39 "DELTA_TYPES", 

40 "OFS_DELTA", 

41 "PACK_SPOOL_FILE_MAX_SIZE", 

42 "REF_DELTA", 

43 "DeltaChainIterator", 

44 "FilePackIndex", 

45 "MemoryPackIndex", 

46 "ObjectContainer", 

47 "Pack", 

48 "PackChunkGenerator", 

49 "PackData", 

50 "PackFileDisappeared", 

51 "PackHint", 

52 "PackIndex", 

53 "PackIndex1", 

54 "PackIndex2", 

55 "PackIndex3", 

56 "PackIndexEntry", 

57 "PackIndexer", 

58 "PackInflater", 

59 "PackStreamCopier", 

60 "PackStreamReader", 

61 "PackedObjectContainer", 

62 "SHA1Reader", 

63 "SHA1Writer", 

64 "UnpackedObject", 

65 "UnpackedObjectIterator", 

66 "UnpackedObjectStream", 

67 "UnresolvedDeltas", 

68 "apply_delta", 

69 "bisect_find_sha", 

70 "chunks_length", 

71 "compute_file_sha", 

72 "deltas_from_sorted_objects", 

73 "deltify_pack_objects", 

74 "extend_pack", 

75 "find_reusable_deltas", 

76 "full_unpacked_object", 

77 "generate_unpacked_objects", 

78 "iter_sha1", 

79 "load_pack_index", 

80 "load_pack_index_file", 

81 "obj_sha", 

82 "pack_header_chunks", 

83 "pack_object_chunks", 

84 "pack_object_header", 

85 "pack_objects_to_data", 

86 "read_pack_header", 

87 "read_zlib_chunks", 

88 "sort_objects_for_delta", 

89 "take_msb_bytes", 

90 "unpack_object", 

91 "verify_and_read", 

92 "write_pack", 

93 "write_pack_data", 

94 "write_pack_from_container", 

95 "write_pack_header", 

96 "write_pack_index", 

97 "write_pack_object", 

98 "write_pack_objects", 

99] 

100 

101import binascii 

102from collections import defaultdict, deque 

103from contextlib import suppress 

104from io import BytesIO, UnsupportedOperation 

105 

106try: 

107 from cdifflib import CSequenceMatcher as SequenceMatcher 

108except ModuleNotFoundError: 

109 from difflib import SequenceMatcher 

110 

111import os 

112import struct 

113import sys 

114import warnings 

115import zlib 

116from collections.abc import Callable, Iterable, Iterator, Sequence, Set 

117from hashlib import sha1, sha256 

118from itertools import chain 

119from os import SEEK_CUR, SEEK_END 

120from struct import unpack_from 

121from types import TracebackType 

122from typing import ( 

123 IO, 

124 TYPE_CHECKING, 

125 Any, 

126 BinaryIO, 

127 Generic, 

128 Protocol, 

129 TypeVar, 

130) 

131 

132try: 

133 import mmap 

134except ImportError: 

135 has_mmap = False 

136else: 

137 has_mmap = True 

138 

139if TYPE_CHECKING: 

140 from _hashlib import HASH as HashObject 

141 

142 from .bitmap import PackBitmap 

143 from .commit_graph import CommitGraph 

144 from .object_store import BaseObjectStore 

145 from .ref import Ref 

146 

147# For some reason the above try, except fails to set has_mmap = False for plan9 

148if sys.platform == "Plan9": 

149 has_mmap = False 

150 

151from .errors import ApplyDeltaError, ChecksumMismatch 

152from .file import GitFile, _GitFile 

153from .lru_cache import LRUSizeCache 

154from .object_format import OBJECT_FORMAT_TYPE_NUMS, SHA1, ObjectFormat 

155from .objects import ( 

156 ObjectID, 

157 RawObjectID, 

158 ShaFile, 

159 hex_to_sha, 

160 object_header, 

161 sha_to_hex, 

162) 

163 

164OFS_DELTA = 6 

165REF_DELTA = 7 

166 

167DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

168 

169 

170DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

171 

172# Keep pack files under 16Mb in memory, otherwise write them out to disk 

173PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

174 

175# Default pack index version to use when none is specified 

176DEFAULT_PACK_INDEX_VERSION = 2 

177 

178 

179OldUnpackedObject = tuple[bytes | int, list[bytes]] | list[bytes] 

180ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

181ProgressFn = Callable[[int, str], None] 

182PackHint = tuple[int, bytes | None] 

183 

184 

185def verify_and_read( 

186 read_func: Callable[[int], bytes], 

187 expected_hash: bytes, 

188 hash_algo: str, 

189 progress: Callable[[bytes], None] | None = None, 

190) -> Iterator[bytes]: 

191 """Read from stream, verify hash, then yield verified chunks. 

192 

193 This function downloads data to a temporary file (in-memory for small files, 

194 on-disk for large ones) while computing its hash. Only after the hash is 

195 verified to match expected_hash will it yield any data. This prevents 

196 corrupted or malicious data from reaching the caller. 

197 

198 Args: 

199 read_func: Function to read bytes (like file.read or HTTP response reader) 

200 expected_hash: Expected hash as hex string bytes (e.g., b'a3b2c1...') 

201 hash_algo: Hash algorithm name ('sha1' or 'sha256') 

202 progress: Optional progress callback 

203 

204 Yields: 

205 Chunks of verified data (only after hash verification succeeds) 

206 

207 Raises: 

208 ValueError: If hash doesn't match or algorithm unsupported 

209 """ 

210 from tempfile import SpooledTemporaryFile 

211 

212 from .object_format import OBJECT_FORMATS 

213 

214 # Get the hash function for this algorithm 

215 obj_format = OBJECT_FORMATS.get(hash_algo) 

216 if obj_format is None: 

217 raise ValueError(f"Unsupported hash algorithm: {hash_algo}") 

218 

219 hasher = obj_format.new_hash() 

220 

221 # Download to temporary file (memory or disk) while computing hash 

222 with SpooledTemporaryFile( 

223 max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix="dulwich-verify-" 

224 ) as temp_file: 

225 # Read data, hash it, and write to temp file 

226 while True: 

227 chunk = read_func(65536) # Read in 64KB chunks 

228 if not chunk: 

229 break 

230 hasher.update(chunk) 

231 temp_file.write(chunk) 

232 

233 # Verify hash BEFORE yielding any data 

234 computed_hash = hasher.hexdigest().encode("ascii") 

235 if computed_hash != expected_hash: 

236 raise ValueError( 

237 f"hash mismatch: expected {expected_hash.decode('ascii')}, " 

238 f"got {computed_hash.decode('ascii')}" 

239 ) 

240 

241 # Hash verified! Now read from temp file and yield chunks 

242 if progress: 

243 progress(b"Hash verified, processing data\n") 

244 

245 temp_file.seek(0) 

246 while True: 

247 chunk = temp_file.read(65536) 

248 if not chunk: 

249 break 

250 yield chunk 

251 

252 

253class UnresolvedDeltas(Exception): 

254 """Delta objects could not be resolved.""" 

255 

256 def __init__(self, shas: list[bytes]) -> None: 

257 """Initialize UnresolvedDeltas exception. 

258 

259 Args: 

260 shas: List of SHA hashes for unresolved delta objects 

261 """ 

262 self.shas = shas 

263 

264 

265class ObjectContainer(Protocol): 

266 """Protocol for objects that can contain git objects.""" 

267 

268 def add_object(self, obj: ShaFile) -> None: 

269 """Add a single object to this object store.""" 

270 

271 def add_objects( 

272 self, 

273 objects: Sequence[tuple[ShaFile, str | None]], 

274 progress: Callable[..., None] | None = None, 

275 ) -> "Pack | None": 

276 """Add a set of objects to this object store. 

277 

278 Args: 

279 objects: Iterable over a list of (object, path) tuples 

280 progress: Progress callback for object insertion 

281 Returns: Optional Pack object of the objects written. 

282 """ 

283 

284 def __contains__(self, sha1: "ObjectID") -> bool: 

285 """Check if a hex sha is present.""" 

286 

287 def __getitem__(self, sha1: "ObjectID | RawObjectID") -> ShaFile: 

288 """Retrieve an object.""" 

289 

290 def get_commit_graph(self) -> "CommitGraph | None": 

291 """Get the commit graph for this object store. 

292 

293 Returns: 

294 CommitGraph object if available, None otherwise 

295 """ 

296 return None 

297 

298 

299class PackedObjectContainer(ObjectContainer): 

300 """Container for objects packed in a pack file.""" 

301 

302 def get_unpacked_object( 

303 self, sha1: "ObjectID | RawObjectID", *, include_comp: bool = False 

304 ) -> "UnpackedObject": 

305 """Get a raw unresolved object. 

306 

307 Args: 

308 sha1: SHA-1 hash of the object 

309 include_comp: Whether to include compressed data 

310 

311 Returns: 

312 UnpackedObject instance 

313 """ 

314 raise NotImplementedError(self.get_unpacked_object) 

315 

316 def iterobjects_subset( 

317 self, shas: Iterable["ObjectID"], *, allow_missing: bool = False 

318 ) -> Iterator[ShaFile]: 

319 """Iterate over a subset of objects. 

320 

321 Args: 

322 shas: Iterable of object SHAs to retrieve 

323 allow_missing: If True, skip missing objects 

324 

325 Returns: 

326 Iterator of ShaFile objects 

327 """ 

328 raise NotImplementedError(self.iterobjects_subset) 

329 

330 def iter_unpacked_subset( 

331 self, 

332 shas: Iterable["ObjectID | RawObjectID"], 

333 *, 

334 include_comp: bool = False, 

335 allow_missing: bool = False, 

336 convert_ofs_delta: bool = True, 

337 ) -> Iterator["UnpackedObject"]: 

338 """Iterate over unpacked objects from a subset of SHAs. 

339 

340 Args: 

341 shas: Set of object SHAs to retrieve 

342 include_comp: Include compressed data if True 

343 allow_missing: If True, skip missing objects 

344 convert_ofs_delta: If True, convert offset deltas to ref deltas 

345 

346 Returns: 

347 Iterator of UnpackedObject instances 

348 """ 

349 raise NotImplementedError(self.iter_unpacked_subset) 

350 

351 

352class UnpackedObjectStream: 

353 """Abstract base class for a stream of unpacked objects.""" 

354 

355 def __iter__(self) -> Iterator["UnpackedObject"]: 

356 """Iterate over unpacked objects.""" 

357 raise NotImplementedError(self.__iter__) 

358 

359 def __len__(self) -> int: 

360 """Return the number of objects in the stream.""" 

361 raise NotImplementedError(self.__len__) 

362 

363 

364def take_msb_bytes( 

365 read: Callable[[int], bytes], crc32: int | None = None 

366) -> tuple[list[int], int | None]: 

367 """Read bytes marked with most significant bit. 

368 

369 Args: 

370 read: Read function 

371 crc32: Optional CRC32 checksum to update 

372 

373 Returns: 

374 Tuple of (list of bytes read, updated CRC32 or None) 

375 """ 

376 ret: list[int] = [] 

377 while len(ret) == 0 or ret[-1] & 0x80: 

378 b = read(1) 

379 if crc32 is not None: 

380 crc32 = binascii.crc32(b, crc32) 

381 ret.append(ord(b[:1])) 

382 return ret, crc32 

383 

384 

385class PackFileDisappeared(Exception): 

386 """Raised when a pack file unexpectedly disappears.""" 

387 

388 def __init__(self, obj: object) -> None: 

389 """Initialize PackFileDisappeared exception. 

390 

391 Args: 

392 obj: The object that triggered the exception 

393 """ 

394 self.obj = obj 

395 

396 

397class UnpackedObject: 

398 """Class encapsulating an object unpacked from a pack file. 

399 

400 These objects should only be created from within unpack_object. Most 

401 members start out as empty and are filled in at various points by 

402 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

403 

404 End users of this object should take care that the function they're getting 

405 this object from is guaranteed to set the members they need. 

406 """ 

407 

408 __slots__ = [ 

409 "_sha", # Cached binary SHA. 

410 "comp_chunks", # Compressed object chunks. 

411 "crc32", # CRC32. 

412 "decomp_chunks", # Decompressed object chunks. 

413 "decomp_len", # Decompressed length of this object. 

414 "delta_base", # Delta base offset or SHA. 

415 "hash_func", # Hash function to use for computing object IDs. 

416 "obj_chunks", # Decompressed and delta-resolved chunks. 

417 "obj_type_num", # Type of this object. 

418 "offset", # Offset in its pack. 

419 "pack_type_num", # Type of this object in the pack (may be a delta). 

420 ] 

421 

422 obj_type_num: int | None 

423 obj_chunks: list[bytes] | None 

424 delta_base: None | bytes | int 

425 decomp_chunks: list[bytes] 

426 comp_chunks: list[bytes] | None 

427 decomp_len: int | None 

428 crc32: int | None 

429 offset: int | None 

430 pack_type_num: int 

431 _sha: bytes | None 

432 hash_func: Callable[[], "HashObject"] 

433 

434 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

435 # methods of this object. 

436 def __init__( 

437 self, 

438 pack_type_num: int, 

439 *, 

440 delta_base: None | bytes | int = None, 

441 decomp_len: int | None = None, 

442 crc32: int | None = None, 

443 sha: bytes | None = None, 

444 decomp_chunks: list[bytes] | None = None, 

445 offset: int | None = None, 

446 hash_func: Callable[[], "HashObject"] = sha1, 

447 ) -> None: 

448 """Initialize an UnpackedObject. 

449 

450 Args: 

451 pack_type_num: Type number of this object in the pack 

452 delta_base: Delta base (offset or SHA) if this is a delta object 

453 decomp_len: Decompressed length of this object 

454 crc32: CRC32 checksum 

455 sha: SHA hash of the object 

456 decomp_chunks: Decompressed chunks 

457 offset: Offset in the pack file 

458 hash_func: Hash function to use (defaults to sha1) 

459 """ 

460 self.offset = offset 

461 self._sha = sha 

462 self.pack_type_num = pack_type_num 

463 self.delta_base = delta_base 

464 self.comp_chunks = None 

465 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

466 if decomp_chunks is not None and decomp_len is None: 

467 self.decomp_len = sum(map(len, decomp_chunks)) 

468 else: 

469 self.decomp_len = decomp_len 

470 self.crc32 = crc32 

471 self.hash_func = hash_func 

472 

473 if pack_type_num in DELTA_TYPES: 

474 self.obj_type_num = None 

475 self.obj_chunks = None 

476 else: 

477 self.obj_type_num = pack_type_num 

478 self.obj_chunks = self.decomp_chunks 

479 self.delta_base = delta_base 

480 

481 def sha(self) -> RawObjectID: 

482 """Return the binary SHA of this object.""" 

483 if self._sha is None: 

484 assert self.obj_type_num is not None and self.obj_chunks is not None 

485 self._sha = obj_sha(self.obj_type_num, self.obj_chunks, self.hash_func) 

486 return RawObjectID(self._sha) 

487 

488 def sha_file(self) -> ShaFile: 

489 """Return a ShaFile from this object.""" 

490 assert self.obj_type_num is not None and self.obj_chunks is not None 

491 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

492 

493 # Only provided for backwards compatibility with code that expects either 

494 # chunks or a delta tuple. 

495 def _obj(self) -> OldUnpackedObject: 

496 """Return the decompressed chunks, or (delta base, delta chunks).""" 

497 if self.pack_type_num in DELTA_TYPES: 

498 assert isinstance(self.delta_base, (bytes, int)) 

499 return (self.delta_base, self.decomp_chunks) 

500 else: 

501 return self.decomp_chunks 

502 

503 def __eq__(self, other: object) -> bool: 

504 """Check equality with another UnpackedObject.""" 

505 if not isinstance(other, UnpackedObject): 

506 return False 

507 for slot in self.__slots__: 

508 if getattr(self, slot) != getattr(other, slot): 

509 return False 

510 return True 

511 

512 def __ne__(self, other: object) -> bool: 

513 """Check inequality with another UnpackedObject.""" 

514 return not (self == other) 

515 

516 def __repr__(self) -> str: 

517 """Return string representation of this UnpackedObject.""" 

518 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

519 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

520 

521 

522_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

523 

524# Default maximum memory for caching delta base objects (matches Git's default 

525# for core.deltaBaseCacheLimit). 

526DEFAULT_DELTA_BASE_CACHE_LIMIT = 96 * 1024 * 1024 # 96 MiB 

527 

528 

529def read_zlib_chunks( 

530 read_some: Callable[[int], bytes], 

531 unpacked: UnpackedObject, 

532 include_comp: bool = False, 

533 buffer_size: int = _ZLIB_BUFSIZE, 

534) -> bytes: 

535 """Read zlib data from a buffer. 

536 

537 This function requires that the buffer have additional data following the 

538 compressed data, which is guaranteed to be the case for git pack files. 

539 

540 Args: 

541 read_some: Read function that returns at least one byte, but may 

542 return less than the requested size. 

543 unpacked: An UnpackedObject to write result data to. If its crc32 

544 attr is not None, the CRC32 of the compressed bytes will be computed 

545 using this starting CRC32. 

546 After this function, will have the following attrs set: 

547 * comp_chunks (if include_comp is True) 

548 * decomp_chunks 

549 * decomp_len 

550 * crc32 

551 include_comp: If True, include compressed data in the result. 

552 buffer_size: Size of the read buffer. 

553 Returns: Leftover unused data from the decompression. 

554 

555 Raises: 

556 zlib.error: if a decompression error occurred. 

557 """ 

558 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

559 raise ValueError("non-negative zlib data stream size expected") 

560 decomp_obj = zlib.decompressobj() 

561 

562 comp_chunks = [] 

563 decomp_chunks = unpacked.decomp_chunks 

564 decomp_len = 0 

565 crc32 = unpacked.crc32 

566 

567 while True: 

568 add = read_some(buffer_size) 

569 if not add: 

570 raise zlib.error("EOF before end of zlib stream") 

571 comp_chunks.append(add) 

572 decomp = decomp_obj.decompress(add) 

573 decomp_len += len(decomp) 

574 decomp_chunks.append(decomp) 

575 unused = decomp_obj.unused_data 

576 if unused: 

577 left = len(unused) 

578 if crc32 is not None: 

579 crc32 = binascii.crc32(add[:-left], crc32) 

580 if include_comp: 

581 comp_chunks[-1] = add[:-left] 

582 break 

583 elif crc32 is not None: 

584 crc32 = binascii.crc32(add, crc32) 

585 if crc32 is not None: 

586 crc32 &= 0xFFFFFFFF 

587 

588 if decomp_len != unpacked.decomp_len: 

589 raise zlib.error("decompressed data does not match expected size") 

590 

591 unpacked.crc32 = crc32 

592 if include_comp: 

593 unpacked.comp_chunks = comp_chunks 

594 return unused 

595 

596 

597def iter_sha1(iter: Iterable[bytes]) -> bytes: 

598 """Return the hexdigest of the SHA1 over a set of names. 

599 

600 Args: 

601 iter: Iterator over string objects 

602 Returns: 40-byte hex sha1 digest 

603 """ 

604 sha = sha1() 

605 for name in iter: 

606 sha.update(name) 

607 return sha.hexdigest().encode("ascii") 

608 

609 

610def load_pack_index( 

611 path: str | os.PathLike[str], object_format: ObjectFormat 

612) -> "PackIndex": 

613 """Load an index file by path. 

614 

615 Args: 

616 path: Path to the index file 

617 object_format: Hash algorithm used by the repository 

618 Returns: A PackIndex loaded from the given path 

619 """ 

620 with GitFile(path, "rb") as f: 

621 return load_pack_index_file(path, f, object_format) 

622 

623 

624def _load_file_contents( 

625 f: IO[bytes] | _GitFile, size: int | None = None 

626) -> tuple[bytes | Any, int]: 

627 """Load contents from a file, preferring mmap when possible. 

628 

629 Args: 

630 f: File-like object to load 

631 size: Expected size, or None to determine from file 

632 Returns: Tuple of (contents, size) 

633 """ 

634 try: 

635 fd = f.fileno() 

636 except (UnsupportedOperation, AttributeError): 

637 fd = None 

638 # Attempt to use mmap if possible 

639 if fd is not None: 

640 if size is None: 

641 size = os.fstat(fd).st_size 

642 if has_mmap: 

643 try: 

644 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

645 except (OSError, ValueError): 

646 # Can't mmap - perhaps a socket or invalid file descriptor 

647 pass 

648 else: 

649 return contents, size 

650 contents_bytes = f.read() 

651 size = len(contents_bytes) 

652 return contents_bytes, size 

653 

654 

655def load_pack_index_file( 

656 path: str | os.PathLike[str], 

657 f: IO[bytes] | _GitFile, 

658 object_format: ObjectFormat, 

659) -> "PackIndex": 

660 """Load an index file from a file-like object. 

661 

662 Args: 

663 path: Path for the index file 

664 f: File-like object 

665 object_format: Hash algorithm used by the repository 

666 Returns: A PackIndex loaded from the given file 

667 """ 

668 contents, size = _load_file_contents(f) 

669 if contents[:4] == b"\377tOc": 

670 version = struct.unpack(b">L", contents[4:8])[0] 

671 if version == 2: 

672 return PackIndex2( 

673 path, 

674 object_format, 

675 file=f, 

676 contents=contents, 

677 size=size, 

678 ) 

679 elif version == 3: 

680 return PackIndex3(path, object_format, file=f, contents=contents, size=size) 

681 else: 

682 raise KeyError(f"Unknown pack index format {version}") 

683 else: 

684 return PackIndex1(path, object_format, file=f, contents=contents, size=size) 

685 

686 

687def bisect_find_sha( 

688 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

689) -> int | None: 

690 """Find a SHA in a data blob with sorted SHAs. 

691 

692 Args: 

693 start: Start index of range to search 

694 end: End index of range to search 

695 sha: Sha to find 

696 unpack_name: Callback to retrieve SHA by index 

697 Returns: Index of the SHA, or None if it wasn't found 

698 """ 

699 assert start <= end 

700 while start <= end: 

701 i = (start + end) // 2 

702 file_sha = unpack_name(i) 

703 if file_sha < sha: 

704 start = i + 1 

705 elif file_sha > sha: 

706 end = i - 1 

707 else: 

708 return i 

709 return None 

710 

711 

712PackIndexEntry = tuple[RawObjectID, int, int | None] 

713 

714 

715class PackIndex: 

716 """An index in to a packfile. 

717 

718 Given a sha id of an object a pack index can tell you the location in the 

719 packfile of that object if it has it. 

720 """ 

721 

722 object_format: "ObjectFormat" 

723 

724 def __eq__(self, other: object) -> bool: 

725 """Check equality with another PackIndex.""" 

726 if not isinstance(other, PackIndex): 

727 return False 

728 

729 for (name1, _, _), (name2, _, _) in zip( 

730 self.iterentries(), other.iterentries() 

731 ): 

732 if name1 != name2: 

733 return False 

734 return True 

735 

736 def __ne__(self, other: object) -> bool: 

737 """Check if this pack index is not equal to another.""" 

738 return not self.__eq__(other) 

739 

740 def __len__(self) -> int: 

741 """Return the number of entries in this pack index.""" 

742 raise NotImplementedError(self.__len__) 

743 

744 def __iter__(self) -> Iterator[ObjectID]: 

745 """Iterate over the SHAs in this pack.""" 

746 return map(lambda sha: sha_to_hex(RawObjectID(sha)), self._itersha()) 

747 

748 def iterentries(self) -> Iterator[PackIndexEntry]: 

749 """Iterate over the entries in this pack index. 

750 

751 Returns: iterator over tuples with object name, offset in packfile and 

752 crc32 checksum. 

753 """ 

754 raise NotImplementedError(self.iterentries) 

755 

756 def get_pack_checksum(self) -> bytes | None: 

757 """Return the SHA1 checksum stored for the corresponding packfile. 

758 

759 Returns: 20-byte binary digest, or None if not available 

760 """ 

761 raise NotImplementedError(self.get_pack_checksum) 

762 

763 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

764 """Return the offset in to the corresponding packfile for the object. 

765 

766 Given the name of an object it will return the offset that object 

767 lives at within the corresponding pack file. If the pack file doesn't 

768 have the object then None will be returned. 

769 """ 

770 raise NotImplementedError(self.object_offset) 

771 

772 def object_sha1(self, index: int) -> bytes: 

773 """Return the SHA1 corresponding to the index in the pack file.""" 

774 for name, offset, _crc32 in self.iterentries(): 

775 if offset == index: 

776 return name 

777 else: 

778 raise KeyError(index) 

779 

780 def _object_offset(self, sha: bytes) -> int: 

781 """See object_offset. 

782 

783 Args: 

784 sha: A *binary* SHA string. (20 characters long)_ 

785 """ 

786 raise NotImplementedError(self._object_offset) 

787 

788 def objects_sha1(self) -> bytes: 

789 """Return the hex SHA1 over all the shas of all objects in this pack. 

790 

791 Note: This is used for the filename of the pack. 

792 """ 

793 return iter_sha1(self._itersha()) 

794 

795 def _itersha(self) -> Iterator[bytes]: 

796 """Yield all the SHA1's of the objects in the index, sorted.""" 

797 raise NotImplementedError(self._itersha) 

798 

799 def iter_prefix(self, prefix: bytes) -> Iterator[RawObjectID]: 

800 """Iterate over all SHA1s with the given prefix. 

801 

802 Args: 

803 prefix: Binary prefix to match 

804 Returns: Iterator of matching SHA1s 

805 """ 

806 # Default implementation for PackIndex classes that don't override 

807 for sha, _, _ in self.iterentries(): 

808 if sha.startswith(prefix): 

809 yield RawObjectID(sha) 

810 

811 def close(self) -> None: 

812 """Close any open files.""" 

813 

814 def check(self) -> None: 

815 """Check the consistency of this pack index.""" 

816 

817 

818class MemoryPackIndex(PackIndex): 

819 """Pack index that is stored entirely in memory.""" 

820 

821 def __init__( 

822 self, 

823 entries: list[PackIndexEntry], 

824 object_format: ObjectFormat, 

825 pack_checksum: bytes | None = None, 

826 ) -> None: 

827 """Create a new MemoryPackIndex. 

828 

829 Args: 

830 entries: Sequence of name, idx, crc32 (sorted) 

831 object_format: Object format used by this index 

832 pack_checksum: Optional pack checksum 

833 """ 

834 self._by_sha = {} 

835 self._by_offset = {} 

836 for name, offset, _crc32 in entries: 

837 self._by_sha[name] = offset 

838 self._by_offset[offset] = name 

839 self._entries = entries 

840 self._pack_checksum = pack_checksum 

841 self.object_format = object_format 

842 

843 def get_pack_checksum(self) -> bytes | None: 

844 """Return the SHA checksum stored for the corresponding packfile.""" 

845 return self._pack_checksum 

846 

847 def __len__(self) -> int: 

848 """Return the number of entries in this pack index.""" 

849 return len(self._entries) 

850 

851 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

852 """Return the offset for the given SHA. 

853 

854 Args: 

855 sha: SHA to look up (binary or hex) 

856 Returns: Offset in the pack file 

857 """ 

858 lookup_sha: RawObjectID 

859 if len(sha) == self.object_format.hex_length: 

860 lookup_sha = hex_to_sha(ObjectID(sha)) 

861 else: 

862 lookup_sha = RawObjectID(sha) 

863 return self._by_sha[lookup_sha] 

864 

865 def object_sha1(self, offset: int) -> bytes: 

866 """Return the SHA1 for the object at the given offset.""" 

867 return self._by_offset[offset] 

868 

869 def _itersha(self) -> Iterator[bytes]: 

870 """Iterate over all SHA1s in the index.""" 

871 return iter(self._by_sha) 

872 

873 def iterentries(self) -> Iterator[PackIndexEntry]: 

874 """Iterate over all index entries.""" 

875 return iter(self._entries) 

876 

877 @classmethod 

878 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

879 """Create a MemoryPackIndex from a PackData object.""" 

880 return MemoryPackIndex( 

881 list(pack_data.sorted_entries()), 

882 pack_checksum=pack_data.get_stored_checksum(), 

883 object_format=pack_data.object_format, 

884 ) 

885 

886 @classmethod 

887 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

888 """Create a copy of another PackIndex in memory.""" 

889 return cls( 

890 list(other_index.iterentries()), 

891 other_index.object_format, 

892 other_index.get_pack_checksum(), 

893 ) 

894 

895 

896class FilePackIndex(PackIndex): 

897 """Pack index that is based on a file. 

898 

899 To do the loop it opens the file, and indexes first 256 4 byte groups 

900 with the first byte of the sha id. The value in the four byte group indexed 

901 is the end of the group that shares the same starting byte. Subtract one 

902 from the starting byte and index again to find the start of the group. 

903 The values are sorted by sha id within the group, so do the math to find 

904 the start and end offset and then bisect in to find if the value is 

905 present. 

906 """ 

907 

908 _fan_out_table: list[int] 

909 _file: IO[bytes] | _GitFile 

910 

911 def __init__( 

912 self, 

913 filename: str | os.PathLike[str], 

914 file: IO[bytes] | _GitFile | None = None, 

915 contents: "bytes | mmap.mmap | None" = None, 

916 size: int | None = None, 

917 ) -> None: 

918 """Create a pack index object. 

919 

920 Provide it with the name of the index file to consider, and it will map 

921 it whenever required. 

922 """ 

923 self._filename = filename 

924 # Take the size now, so it can be checked each time we map the file to 

925 # ensure that it hasn't changed. 

926 if file is None: 

927 self._file = GitFile(filename, "rb") 

928 else: 

929 self._file = file 

930 if contents is None: 

931 self._contents, self._size = _load_file_contents(self._file, size) 

932 else: 

933 self._contents = contents 

934 self._size = size if size is not None else len(contents) 

935 

936 @property 

937 def path(self) -> str: 

938 """Return the path to this index file.""" 

939 return os.fspath(self._filename) 

940 

941 def __eq__(self, other: object) -> bool: 

942 """Check equality with another FilePackIndex.""" 

943 # Quick optimization: 

944 if ( 

945 isinstance(other, FilePackIndex) 

946 and self._fan_out_table != other._fan_out_table 

947 ): 

948 return False 

949 

950 return super().__eq__(other) 

951 

952 def close(self) -> None: 

953 """Close the underlying file and any mmap.""" 

954 self._file.close() 

955 close_fn = getattr(self._contents, "close", None) 

956 if close_fn is not None: 

957 close_fn() 

958 

959 def __len__(self) -> int: 

960 """Return the number of entries in this pack index.""" 

961 return self._fan_out_table[-1] 

962 

963 def _unpack_entry(self, i: int) -> PackIndexEntry: 

964 """Unpack the i-th entry in the index file. 

965 

966 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

967 checksum (if known). 

968 """ 

969 raise NotImplementedError(self._unpack_entry) 

970 

971 def _unpack_name(self, i: int) -> bytes: 

972 """Unpack the i-th name from the index file.""" 

973 raise NotImplementedError(self._unpack_name) 

974 

975 def _unpack_offset(self, i: int) -> int: 

976 """Unpack the i-th object offset from the index file.""" 

977 raise NotImplementedError(self._unpack_offset) 

978 

979 def _unpack_crc32_checksum(self, i: int) -> int | None: 

980 """Unpack the crc32 checksum for the ith object from the index file.""" 

981 raise NotImplementedError(self._unpack_crc32_checksum) 

982 

983 def _itersha(self) -> Iterator[bytes]: 

984 """Iterate over all SHA1s in the index.""" 

985 for i in range(len(self)): 

986 yield self._unpack_name(i) 

987 

988 def iterentries(self) -> Iterator[PackIndexEntry]: 

989 """Iterate over the entries in this pack index. 

990 

991 Returns: iterator over tuples with object name, offset in packfile and 

992 crc32 checksum. 

993 """ 

994 for i in range(len(self)): 

995 yield self._unpack_entry(i) 

996 

997 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

998 """Read the fan-out table from the index. 

999 

1000 The fan-out table contains 256 entries mapping first byte values 

1001 to the number of objects with SHA1s less than or equal to that byte. 

1002 

1003 Args: 

1004 start_offset: Offset in the file where the fan-out table starts 

1005 Returns: List of 256 integers 

1006 """ 

1007 ret = [] 

1008 for i in range(0x100): 

1009 fanout_entry = self._contents[ 

1010 start_offset + i * 4 : start_offset + (i + 1) * 4 

1011 ] 

1012 ret.append(struct.unpack(">L", fanout_entry)[0]) 

1013 return ret 

1014 

1015 def check(self) -> None: 

1016 """Check that the stored checksum matches the actual checksum.""" 

1017 actual = self.calculate_checksum() 

1018 stored = self.get_stored_checksum() 

1019 if actual != stored: 

1020 raise ChecksumMismatch(stored, actual) 

1021 

1022 def calculate_checksum(self) -> bytes: 

1023 """Calculate the SHA1 checksum over this pack index. 

1024 

1025 Returns: This is a 20-byte binary digest 

1026 """ 

1027 return sha1(self._contents[:-20]).digest() 

1028 

1029 def get_pack_checksum(self) -> bytes: 

1030 """Return the SHA1 checksum stored for the corresponding packfile. 

1031 

1032 Returns: 20-byte binary digest 

1033 """ 

1034 return bytes(self._contents[-40:-20]) 

1035 

1036 def get_stored_checksum(self) -> bytes: 

1037 """Return the SHA1 checksum stored for this index. 

1038 

1039 Returns: 20-byte binary digest 

1040 """ 

1041 return bytes(self._contents[-20:]) 

1042 

1043 def object_offset(self, sha: ObjectID | RawObjectID) -> int: 

1044 """Return the offset in to the corresponding packfile for the object. 

1045 

1046 Given the name of an object it will return the offset that object 

1047 lives at within the corresponding pack file. If the pack file doesn't 

1048 have the object then None will be returned. 

1049 """ 

1050 lookup_sha: RawObjectID 

1051 if len(sha) == self.object_format.hex_length: # hex string 

1052 lookup_sha = hex_to_sha(ObjectID(sha)) 

1053 else: 

1054 lookup_sha = RawObjectID(sha) 

1055 try: 

1056 return self._object_offset(lookup_sha) 

1057 except ValueError as exc: 

1058 closed = getattr(self._contents, "closed", None) 

1059 if closed in (None, True): 

1060 raise PackFileDisappeared(self) from exc 

1061 raise 

1062 

1063 def _object_offset(self, sha: bytes) -> int: 

1064 """See object_offset. 

1065 

1066 Args: 

1067 sha: A *binary* SHA string. (20 characters long)_ 

1068 """ 

1069 hash_size = getattr(self, "hash_size", 20) # Default to SHA1 for v1 

1070 assert len(sha) == hash_size 

1071 idx = ord(sha[:1]) 

1072 if idx == 0: 

1073 start = 0 

1074 else: 

1075 start = self._fan_out_table[idx - 1] 

1076 end = self._fan_out_table[idx] 

1077 i = bisect_find_sha(start, end, sha, self._unpack_name) 

1078 if i is None: 

1079 raise KeyError(sha) 

1080 return self._unpack_offset(i) 

1081 

1082 def iter_prefix(self, prefix: bytes) -> Iterator[RawObjectID]: 

1083 """Iterate over all SHA1s with the given prefix.""" 

1084 start = ord(prefix[:1]) 

1085 if start == 0: 

1086 start = 0 

1087 else: 

1088 start = self._fan_out_table[start - 1] 

1089 end = ord(prefix[:1]) + 1 

1090 if end == 0x100: 

1091 end = len(self) 

1092 else: 

1093 end = self._fan_out_table[end] 

1094 assert start <= end 

1095 started = False 

1096 for i in range(start, end): 

1097 name: bytes = self._unpack_name(i) 

1098 if name.startswith(prefix): 

1099 yield RawObjectID(name) 

1100 started = True 

1101 elif started: 

1102 break 

1103 

1104 

1105class PackIndex1(FilePackIndex): 

1106 """Version 1 Pack Index file.""" 

1107 

1108 object_format = SHA1 

1109 

1110 def __init__( 

1111 self, 

1112 filename: str | os.PathLike[str], 

1113 object_format: ObjectFormat, 

1114 file: IO[bytes] | _GitFile | None = None, 

1115 contents: bytes | None = None, 

1116 size: int | None = None, 

1117 ) -> None: 

1118 """Initialize a version 1 pack index. 

1119 

1120 Args: 

1121 filename: Path to the index file 

1122 object_format: Object format used by the repository 

1123 file: Optional file object 

1124 contents: Optional mmap'd contents 

1125 size: Optional size of the index 

1126 """ 

1127 super().__init__(filename, file, contents, size) 

1128 

1129 # PackIndex1 only supports SHA1 

1130 if object_format != SHA1: 

1131 raise AssertionError( 

1132 f"PackIndex1 only supports SHA1, not {object_format.name}" 

1133 ) 

1134 

1135 self.object_format = object_format 

1136 self.version = 1 

1137 self._fan_out_table = self._read_fan_out_table(0) 

1138 self.hash_size = self.object_format.oid_length 

1139 self._entry_size = 4 + self.hash_size 

1140 

1141 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, None]: 

1142 base_offset = (0x100 * 4) + (i * self._entry_size) 

1143 offset = unpack_from(">L", self._contents, base_offset)[0] 

1144 name = self._contents[base_offset + 4 : base_offset + 4 + self.hash_size] 

1145 return (RawObjectID(name), offset, None) 

1146 

1147 def _unpack_name(self, i: int) -> bytes: 

1148 offset = (0x100 * 4) + (i * self._entry_size) + 4 

1149 return self._contents[offset : offset + self.hash_size] 

1150 

1151 def _unpack_offset(self, i: int) -> int: 

1152 offset = (0x100 * 4) + (i * self._entry_size) 

1153 return int(unpack_from(">L", self._contents, offset)[0]) 

1154 

1155 def _unpack_crc32_checksum(self, i: int) -> None: 

1156 # Not stored in v1 index files 

1157 return None 

1158 

1159 

1160class PackIndex2(FilePackIndex): 

1161 """Version 2 Pack Index file.""" 

1162 

1163 object_format = SHA1 

1164 

1165 def __init__( 

1166 self, 

1167 filename: str | os.PathLike[str], 

1168 object_format: ObjectFormat, 

1169 file: IO[bytes] | _GitFile | None = None, 

1170 contents: bytes | None = None, 

1171 size: int | None = None, 

1172 ) -> None: 

1173 """Initialize a version 2 pack index. 

1174 

1175 Args: 

1176 filename: Path to the index file 

1177 object_format: Object format used by the repository 

1178 file: Optional file object 

1179 contents: Optional mmap'd contents 

1180 size: Optional size of the index 

1181 """ 

1182 super().__init__(filename, file, contents, size) 

1183 self.object_format = object_format 

1184 if self._contents[:4] != b"\377tOc": 

1185 raise AssertionError("Not a v2 pack index file") 

1186 (self.version,) = unpack_from(b">L", self._contents, 4) 

1187 if self.version != 2: 

1188 raise AssertionError(f"Version was {self.version}") 

1189 self._fan_out_table = self._read_fan_out_table(8) 

1190 self.hash_size = self.object_format.oid_length 

1191 self._name_table_offset = 8 + 0x100 * 4 

1192 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1193 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1194 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1195 self 

1196 ) 

1197 

1198 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, int]: 

1199 return ( 

1200 RawObjectID(self._unpack_name(i)), 

1201 self._unpack_offset(i), 

1202 self._unpack_crc32_checksum(i), 

1203 ) 

1204 

1205 def _unpack_name(self, i: int) -> bytes: 

1206 offset = self._name_table_offset + i * self.hash_size 

1207 return self._contents[offset : offset + self.hash_size] 

1208 

1209 def _unpack_offset(self, i: int) -> int: 

1210 offset = self._pack_offset_table_offset + i * 4 

1211 offset_val = int(unpack_from(">L", self._contents, offset)[0]) 

1212 if offset_val & (2**31): 

1213 offset = ( 

1214 self._pack_offset_largetable_offset + (offset_val & (2**31 - 1)) * 8 

1215 ) 

1216 offset_val = int(unpack_from(">Q", self._contents, offset)[0]) 

1217 return offset_val 

1218 

1219 def _unpack_crc32_checksum(self, i: int) -> int: 

1220 return int( 

1221 unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1222 ) 

1223 

1224 def get_pack_checksum(self) -> bytes: 

1225 """Return the checksum stored for the corresponding packfile. 

1226 

1227 Returns: binary digest (size depends on hash algorithm) 

1228 """ 

1229 # Index ends with: pack_checksum + index_checksum 

1230 # Each checksum is hash_size bytes 

1231 checksum_size = self.hash_size 

1232 return bytes(self._contents[-2 * checksum_size : -checksum_size]) 

1233 

1234 def get_stored_checksum(self) -> bytes: 

1235 """Return the checksum stored for this index. 

1236 

1237 Returns: binary digest (size depends on hash algorithm) 

1238 """ 

1239 checksum_size = self.hash_size 

1240 return bytes(self._contents[-checksum_size:]) 

1241 

1242 def calculate_checksum(self) -> bytes: 

1243 """Calculate the checksum over this pack index. 

1244 

1245 Returns: binary digest (size depends on hash algorithm) 

1246 """ 

1247 # Determine hash function based on hash_size 

1248 if self.hash_size == 20: 

1249 hash_func = sha1 

1250 elif self.hash_size == 32: 

1251 hash_func = sha256 

1252 else: 

1253 raise ValueError(f"Unsupported hash size: {self.hash_size}") 

1254 

1255 return hash_func(self._contents[: -self.hash_size]).digest() 

1256 

1257 

1258class PackIndex3(FilePackIndex): 

1259 """Version 3 Pack Index file. 

1260 

1261 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1262 """ 

1263 

1264 def __init__( 

1265 self, 

1266 filename: str | os.PathLike[str], 

1267 object_format: ObjectFormat, 

1268 file: IO[bytes] | _GitFile | None = None, 

1269 contents: bytes | None = None, 

1270 size: int | None = None, 

1271 ) -> None: 

1272 """Initialize a version 3 pack index. 

1273 

1274 Args: 

1275 filename: Path to the index file 

1276 object_format: Object format used by the repository 

1277 file: Optional file object 

1278 contents: Optional mmap'd contents 

1279 size: Optional size of the index 

1280 """ 

1281 super().__init__(filename, file, contents, size) 

1282 if self._contents[:4] != b"\377tOc": 

1283 raise AssertionError("Not a v3 pack index file") 

1284 (self.version,) = unpack_from(b">L", self._contents, 4) 

1285 if self.version != 3: 

1286 raise AssertionError(f"Version was {self.version}") 

1287 

1288 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1289 (self.hash_format,) = unpack_from(b">L", self._contents, 8) 

1290 file_object_format = OBJECT_FORMAT_TYPE_NUMS[self.hash_format] 

1291 

1292 # Verify provided object_format matches what's in the file 

1293 if object_format != file_object_format: 

1294 raise AssertionError( 

1295 f"Object format mismatch: provided {object_format.name}, " 

1296 f"but file contains {file_object_format.name}" 

1297 ) 

1298 

1299 self.object_format = object_format 

1300 self.hash_size = self.object_format.oid_length 

1301 

1302 # Read length of shortened object names 

1303 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1304 

1305 # Calculate offsets based on variable hash size 

1306 self._fan_out_table = self._read_fan_out_table( 

1307 16 

1308 ) # After header (4 + 4 + 4 + 4) 

1309 self._name_table_offset = 16 + 0x100 * 4 

1310 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1311 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1312 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1313 self 

1314 ) 

1315 

1316 def _unpack_entry(self, i: int) -> tuple[RawObjectID, int, int]: 

1317 return ( 

1318 RawObjectID(self._unpack_name(i)), 

1319 self._unpack_offset(i), 

1320 self._unpack_crc32_checksum(i), 

1321 ) 

1322 

1323 def _unpack_name(self, i: int) -> bytes: 

1324 offset = self._name_table_offset + i * self.hash_size 

1325 return self._contents[offset : offset + self.hash_size] 

1326 

1327 def _unpack_offset(self, i: int) -> int: 

1328 offset_pos = self._pack_offset_table_offset + i * 4 

1329 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1330 assert isinstance(offset, int) 

1331 if offset & (2**31): 

1332 large_offset_pos = ( 

1333 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1334 ) 

1335 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1336 assert isinstance(offset, int) 

1337 return offset 

1338 

1339 def _unpack_crc32_checksum(self, i: int) -> int: 

1340 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1341 assert isinstance(result, int) 

1342 return result 

1343 

1344 

1345def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1346 """Read the header of a pack file. 

1347 

1348 Args: 

1349 read: Read function 

1350 Returns: Tuple of (pack version, number of objects). If no data is 

1351 available to read, returns (None, None). 

1352 """ 

1353 header = read(12) 

1354 if not header: 

1355 raise AssertionError("file too short to contain pack") 

1356 if header[:4] != b"PACK": 

1357 raise AssertionError(f"Invalid pack header {header!r}") 

1358 (version,) = unpack_from(b">L", header, 4) 

1359 if version not in (2, 3): 

1360 raise AssertionError(f"Version was {version}") 

1361 (num_objects,) = unpack_from(b">L", header, 8) 

1362 return (version, num_objects) 

1363 

1364 

1365def chunks_length(chunks: bytes | Iterable[bytes]) -> int: 

1366 """Get the total length of a sequence of chunks. 

1367 

1368 Args: 

1369 chunks: Either a single bytes object or an iterable of bytes 

1370 Returns: Total length in bytes 

1371 """ 

1372 if isinstance(chunks, bytes): 

1373 return len(chunks) 

1374 else: 

1375 return sum(map(len, chunks)) 

1376 

1377 

1378def unpack_object( 

1379 read_all: Callable[[int], bytes], 

1380 hash_func: Callable[[], "HashObject"], 

1381 read_some: Callable[[int], bytes] | None = None, 

1382 compute_crc32: bool = False, 

1383 include_comp: bool = False, 

1384 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1385) -> tuple[UnpackedObject, bytes]: 

1386 """Unpack a Git object. 

1387 

1388 Args: 

1389 read_all: Read function that blocks until the number of requested 

1390 bytes are read. 

1391 hash_func: Hash function to use for computing object IDs. 

1392 read_some: Read function that returns at least one byte, but may not 

1393 return the number of bytes requested. 

1394 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1395 False, the returned CRC32 will be None. 

1396 include_comp: If True, include compressed data in the result. 

1397 zlib_bufsize: An optional buffer size for zlib operations. 

1398 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1399 leftover from decompression, and unpacked in an UnpackedObject with 

1400 the following attrs set: 

1401 

1402 * obj_chunks (for non-delta types) 

1403 * pack_type_num 

1404 * delta_base (for delta types) 

1405 * comp_chunks (if include_comp is True) 

1406 * decomp_chunks 

1407 * decomp_len 

1408 * crc32 (if compute_crc32 is True) 

1409 """ 

1410 if read_some is None: 

1411 read_some = read_all 

1412 if compute_crc32: 

1413 crc32 = 0 

1414 else: 

1415 crc32 = None 

1416 

1417 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1418 type_num = (raw[0] >> 4) & 0x07 

1419 size = raw[0] & 0x0F 

1420 for i, byte in enumerate(raw[1:]): 

1421 size += (byte & 0x7F) << ((i * 7) + 4) 

1422 

1423 delta_base: int | bytes | None 

1424 raw_base = len(raw) 

1425 if type_num == OFS_DELTA: 

1426 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1427 raw_base += len(raw) 

1428 if raw[-1] & 0x80: 

1429 raise AssertionError 

1430 delta_base_offset = raw[0] & 0x7F 

1431 for byte in raw[1:]: 

1432 delta_base_offset += 1 

1433 delta_base_offset <<= 7 

1434 delta_base_offset += byte & 0x7F 

1435 delta_base = delta_base_offset 

1436 elif type_num == REF_DELTA: 

1437 # Determine hash size from hash_func 

1438 hash_size = len(hash_func().digest()) 

1439 delta_base_obj = read_all(hash_size) 

1440 if crc32 is not None: 

1441 crc32 = binascii.crc32(delta_base_obj, crc32) 

1442 delta_base = delta_base_obj 

1443 raw_base += hash_size 

1444 else: 

1445 delta_base = None 

1446 

1447 unpacked = UnpackedObject( 

1448 type_num, 

1449 delta_base=delta_base, 

1450 decomp_len=size, 

1451 crc32=crc32, 

1452 hash_func=hash_func, 

1453 ) 

1454 unused = read_zlib_chunks( 

1455 read_some, 

1456 unpacked, 

1457 buffer_size=zlib_bufsize, 

1458 include_comp=include_comp, 

1459 ) 

1460 return unpacked, unused 

1461 

1462 

1463def _compute_object_size(value: tuple[int, Any]) -> int: 

1464 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1465 (num, obj) = value 

1466 if num in DELTA_TYPES: 

1467 return chunks_length(obj[1]) 

1468 return chunks_length(obj) 

1469 

1470 

1471class PackStreamReader: 

1472 """Class to read a pack stream. 

1473 

1474 The pack is read from a ReceivableProtocol using read() or recv() as 

1475 appropriate. 

1476 """ 

1477 

1478 def __init__( 

1479 self, 

1480 hash_func: Callable[[], "HashObject"], 

1481 read_all: Callable[[int], bytes], 

1482 read_some: Callable[[int], bytes] | None = None, 

1483 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1484 ) -> None: 

1485 """Initialize pack stream reader. 

1486 

1487 Args: 

1488 hash_func: Hash function to use for computing object IDs 

1489 read_all: Function to read all requested bytes 

1490 read_some: Function to read some bytes (optional) 

1491 zlib_bufsize: Buffer size for zlib decompression 

1492 """ 

1493 self.read_all = read_all 

1494 if read_some is None: 

1495 self.read_some = read_all 

1496 else: 

1497 self.read_some = read_some 

1498 self.hash_func = hash_func 

1499 self.sha = hash_func() 

1500 self._hash_size = len(hash_func().digest()) 

1501 self._offset = 0 

1502 self._rbuf = BytesIO() 

1503 # trailer is a deque to avoid memory allocation on small reads 

1504 self._trailer: deque[int] = deque() 

1505 self._zlib_bufsize = zlib_bufsize 

1506 

1507 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1508 """Read up to size bytes using the given callback. 

1509 

1510 As a side effect, update the verifier's hash (excluding the last 

1511 hash_size bytes read, which is the pack checksum). 

1512 

1513 Args: 

1514 read: The read callback to read from. 

1515 size: The maximum number of bytes to read; the particular 

1516 behavior is callback-specific. 

1517 Returns: Bytes read 

1518 """ 

1519 data = read(size) 

1520 

1521 # maintain a trailer of the last hash_size bytes we've read 

1522 n = len(data) 

1523 self._offset += n 

1524 tn = len(self._trailer) 

1525 if n >= self._hash_size: 

1526 to_pop = tn 

1527 to_add = self._hash_size 

1528 else: 

1529 to_pop = max(n + tn - self._hash_size, 0) 

1530 to_add = n 

1531 self.sha.update( 

1532 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1533 ) 

1534 self._trailer.extend(data[-to_add:]) 

1535 

1536 # hash everything but the trailer 

1537 self.sha.update(data[:-to_add]) 

1538 return data 

1539 

1540 def _buf_len(self) -> int: 

1541 buf = self._rbuf 

1542 start = buf.tell() 

1543 buf.seek(0, SEEK_END) 

1544 end = buf.tell() 

1545 buf.seek(start) 

1546 return end - start 

1547 

1548 @property 

1549 def offset(self) -> int: 

1550 """Return current offset in the stream.""" 

1551 return self._offset - self._buf_len() 

1552 

1553 def read(self, size: int) -> bytes: 

1554 """Read, blocking until size bytes are read.""" 

1555 buf_len = self._buf_len() 

1556 if buf_len >= size: 

1557 return self._rbuf.read(size) 

1558 buf_data = self._rbuf.read() 

1559 self._rbuf = BytesIO() 

1560 return buf_data + self._read(self.read_all, size - buf_len) 

1561 

1562 def recv(self, size: int) -> bytes: 

1563 """Read up to size bytes, blocking until one byte is read.""" 

1564 buf_len = self._buf_len() 

1565 if buf_len: 

1566 data = self._rbuf.read(size) 

1567 if size >= buf_len: 

1568 self._rbuf = BytesIO() 

1569 return data 

1570 return self._read(self.read_some, size) 

1571 

1572 def __len__(self) -> int: 

1573 """Return the number of objects in this pack.""" 

1574 return self._num_objects 

1575 

1576 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1577 """Read the objects in this pack file. 

1578 

1579 Args: 

1580 compute_crc32: If True, compute the CRC32 of the compressed 

1581 data. If False, the returned CRC32 will be None. 

1582 Returns: Iterator over UnpackedObjects with the following members set: 

1583 offset 

1584 obj_type_num 

1585 obj_chunks (for non-delta types) 

1586 delta_base (for delta types) 

1587 decomp_chunks 

1588 decomp_len 

1589 crc32 (if compute_crc32 is True) 

1590 

1591 Raises: 

1592 ChecksumMismatch: if the checksum of the pack contents does not 

1593 match the checksum in the pack trailer. 

1594 zlib.error: if an error occurred during zlib decompression. 

1595 IOError: if an error occurred writing to the output file. 

1596 """ 

1597 _pack_version, self._num_objects = read_pack_header(self.read) 

1598 

1599 for _ in range(self._num_objects): 

1600 offset = self.offset 

1601 unpacked, unused = unpack_object( 

1602 self.read, 

1603 self.hash_func, 

1604 read_some=self.recv, 

1605 compute_crc32=compute_crc32, 

1606 zlib_bufsize=self._zlib_bufsize, 

1607 ) 

1608 unpacked.offset = offset 

1609 

1610 # prepend any unused data to current read buffer 

1611 buf = BytesIO() 

1612 buf.write(unused) 

1613 buf.write(self._rbuf.read()) 

1614 buf.seek(0) 

1615 self._rbuf = buf 

1616 

1617 yield unpacked 

1618 

1619 if self._buf_len() < self._hash_size: 

1620 # If the read buffer is full, then the last read() got the whole 

1621 # trailer off the wire. If not, it means there is still some of the 

1622 # trailer to read. We need to read() all hash_size bytes; N come from the 

1623 # read buffer and (hash_size - N) come from the wire. 

1624 self.read(self._hash_size) 

1625 

1626 pack_sha = bytearray(self._trailer) 

1627 if pack_sha != self.sha.digest(): 

1628 raise ChecksumMismatch( 

1629 sha_to_hex(RawObjectID(bytes(pack_sha))), self.sha.hexdigest() 

1630 ) 

1631 

1632 

1633class PackStreamCopier(PackStreamReader): 

1634 """Class to verify a pack stream as it is being read. 

1635 

1636 The pack is read from a ReceivableProtocol using read() or recv() as 

1637 appropriate and written out to the given file-like object. 

1638 """ 

1639 

1640 def __init__( 

1641 self, 

1642 hash_func: Callable[[], "HashObject"], 

1643 read_all: Callable[[int], bytes], 

1644 read_some: Callable[[int], bytes] | None, 

1645 outfile: IO[bytes], 

1646 delta_iter: "DeltaChainIterator[UnpackedObject] | None" = None, 

1647 ) -> None: 

1648 """Initialize the copier. 

1649 

1650 Args: 

1651 hash_func: Hash function to use for computing object IDs 

1652 read_all: Read function that blocks until the number of 

1653 requested bytes are read. 

1654 read_some: Read function that returns at least one byte, but may 

1655 not return the number of bytes requested. 

1656 outfile: File-like object to write output through. 

1657 delta_iter: Optional DeltaChainIterator to record deltas as we 

1658 read them. 

1659 """ 

1660 super().__init__(hash_func, read_all, read_some=read_some) 

1661 self.outfile = outfile 

1662 self._delta_iter = delta_iter 

1663 

1664 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1665 """Read data from the read callback and write it to the file.""" 

1666 data = super()._read(read, size) 

1667 self.outfile.write(data) 

1668 return data 

1669 

1670 def verify(self, progress: Callable[..., None] | None = None) -> None: 

1671 """Verify a pack stream and write it to the output file. 

1672 

1673 See PackStreamReader.iterobjects for a list of exceptions this may 

1674 throw. 

1675 """ 

1676 i = 0 # default count of entries if read_objects() is empty 

1677 for i, unpacked in enumerate(self.read_objects()): 

1678 if self._delta_iter: 

1679 self._delta_iter.record(unpacked) 

1680 if progress is not None: 

1681 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1682 if progress is not None: 

1683 progress(f"copied {i} pack entries\n".encode("ascii")) 

1684 

1685 

1686def obj_sha( 

1687 type: int, 

1688 chunks: bytes | Iterable[bytes], 

1689 hash_func: Callable[[], "HashObject"] = sha1, 

1690) -> bytes: 

1691 """Compute the SHA for a numeric type and object chunks. 

1692 

1693 Args: 

1694 type: Object type number 

1695 chunks: Object data chunks 

1696 hash_func: Hash function to use (defaults to sha1) 

1697 

1698 Returns: 

1699 Binary hash digest 

1700 """ 

1701 sha = hash_func() 

1702 sha.update(object_header(type, chunks_length(chunks))) 

1703 if isinstance(chunks, bytes): 

1704 sha.update(chunks) 

1705 else: 

1706 for chunk in chunks: 

1707 sha.update(chunk) 

1708 return sha.digest() 

1709 

1710 

1711def compute_file_sha( 

1712 f: IO[bytes], 

1713 hash_func: Callable[[], "HashObject"], 

1714 start_ofs: int = 0, 

1715 end_ofs: int = 0, 

1716 buffer_size: int = 1 << 16, 

1717) -> "HashObject": 

1718 """Hash a portion of a file into a new SHA. 

1719 

1720 Args: 

1721 f: A file-like object to read from that supports seek(). 

1722 hash_func: A callable that returns a new HashObject. 

1723 start_ofs: The offset in the file to start reading at. 

1724 end_ofs: The offset in the file to end reading at, relative to the 

1725 end of the file. 

1726 buffer_size: A buffer size for reading. 

1727 Returns: A new SHA object updated with data read from the file. 

1728 """ 

1729 sha = hash_func() 

1730 f.seek(0, SEEK_END) 

1731 length = f.tell() 

1732 if start_ofs < 0: 

1733 raise AssertionError(f"start_ofs cannot be negative: {start_ofs}") 

1734 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1735 raise AssertionError( 

1736 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1737 ) 

1738 todo = length + end_ofs - start_ofs 

1739 f.seek(start_ofs) 

1740 while todo: 

1741 data = f.read(min(todo, buffer_size)) 

1742 sha.update(data) 

1743 todo -= len(data) 

1744 return sha 

1745 

1746 

1747class PackData: 

1748 """The data contained in a packfile. 

1749 

1750 Pack files can be accessed both sequentially for exploding a pack, and 

1751 directly with the help of an index to retrieve a specific object. 

1752 

1753 The objects within are either complete or a delta against another. 

1754 

1755 The header is variable length. If the MSB of each byte is set then it 

1756 indicates that the subsequent byte is still part of the header. 

1757 For the first byte the next MS bits are the type, which tells you the type 

1758 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1759 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1760 size, i.e. the last byte of the header contains the MS bits of the size. 

1761 

1762 For the complete objects the data is stored as zlib deflated data. 

1763 The size in the header is the uncompressed object size, so to uncompress 

1764 you need to just keep feeding data to zlib until you get an object back, 

1765 or it errors on bad data. This is done here by just giving the complete 

1766 buffer from the start of the deflated object on. This is bad, but until I 

1767 get mmap sorted out it will have to do. 

1768 

1769 Currently there are no integrity checks done. Also no attempt is made to 

1770 try and detect the delta case, or a request for an object at the wrong 

1771 position. It will all just throw a zlib or KeyError. 

1772 """ 

1773 

1774 def __init__( 

1775 self, 

1776 filename: str | os.PathLike[str], 

1777 object_format: ObjectFormat, 

1778 file: IO[bytes] | None = None, 

1779 size: int | None = None, 

1780 *, 

1781 delta_window_size: int | None = None, 

1782 window_memory: int | None = None, 

1783 delta_cache_size: int | None = None, 

1784 depth: int | None = None, 

1785 threads: int | None = None, 

1786 big_file_threshold: int | None = None, 

1787 delta_base_cache_limit: int | None = None, 

1788 ) -> None: 

1789 """Create a PackData object representing the pack in the given filename. 

1790 

1791 The file must exist and stay readable until the object is disposed of. 

1792 It must also stay the same size. It will be mapped whenever needed. 

1793 

1794 Currently there is a restriction on the size of the pack as the python 

1795 mmap implementation is flawed. 

1796 """ 

1797 self._filename = filename 

1798 self.object_format = object_format 

1799 self._size = size 

1800 self._header_size = 12 

1801 self.delta_window_size = delta_window_size 

1802 self.window_memory = window_memory 

1803 self.delta_cache_size = delta_cache_size 

1804 self.depth = depth 

1805 self.threads = threads 

1806 self.big_file_threshold = big_file_threshold 

1807 self.delta_base_cache_limit = delta_base_cache_limit 

1808 self._file: IO[bytes] 

1809 

1810 if file is None: 

1811 self._file = GitFile(self._filename, "rb") 

1812 else: 

1813 self._file = file 

1814 (_version, self._num_objects) = read_pack_header(self._file.read) 

1815 

1816 # Use delta_base_cache_limit, then delta_cache_size, then default 

1817 cache_size = ( 

1818 delta_base_cache_limit or delta_cache_size or DEFAULT_DELTA_BASE_CACHE_LIMIT 

1819 ) 

1820 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1821 cache_size, compute_size=_compute_object_size 

1822 ) 

1823 

1824 @property 

1825 def filename(self) -> str: 

1826 """Get the filename of the pack file. 

1827 

1828 Returns: 

1829 Base filename without directory path 

1830 """ 

1831 return os.path.basename(self._filename) 

1832 

1833 @property 

1834 def path(self) -> str | os.PathLike[str]: 

1835 """Get the full path of the pack file. 

1836 

1837 Returns: 

1838 Full path to the pack file 

1839 """ 

1840 return self._filename 

1841 

1842 @classmethod 

1843 def from_file( 

1844 cls, 

1845 file: IO[bytes], 

1846 object_format: ObjectFormat, 

1847 size: int | None = None, 

1848 ) -> "PackData": 

1849 """Create a PackData object from an open file. 

1850 

1851 Args: 

1852 file: Open file object 

1853 object_format: Object format 

1854 size: Optional file size 

1855 

1856 Returns: 

1857 PackData instance 

1858 """ 

1859 return cls(str(file), object_format, file=file, size=size) 

1860 

1861 @classmethod 

1862 def from_path( 

1863 cls, 

1864 path: str | os.PathLike[str], 

1865 object_format: ObjectFormat, 

1866 ) -> "PackData": 

1867 """Create a PackData object from a file path. 

1868 

1869 Args: 

1870 path: Path to the pack file 

1871 object_format: Object format 

1872 

1873 Returns: 

1874 PackData instance 

1875 """ 

1876 return cls(filename=path, object_format=object_format) 

1877 

1878 def close(self) -> None: 

1879 """Close the underlying pack file.""" 

1880 if self._file is not None: 

1881 self._file.close() 

1882 self._file = None # type: ignore 

1883 

1884 def __del__(self) -> None: 

1885 """Ensure pack file is closed when PackData is garbage collected.""" 

1886 if self._file is not None: 

1887 import warnings 

1888 

1889 warnings.warn( 

1890 f"unclosed PackData {self!r}", 

1891 ResourceWarning, 

1892 stacklevel=2, 

1893 source=self, 

1894 ) 

1895 try: 

1896 self.close() 

1897 except Exception: 

1898 # Ignore errors during cleanup 

1899 pass 

1900 

1901 def __enter__(self) -> "PackData": 

1902 """Enter context manager.""" 

1903 return self 

1904 

1905 def __exit__( 

1906 self, 

1907 exc_type: type | None, 

1908 exc_val: BaseException | None, 

1909 exc_tb: TracebackType | None, 

1910 ) -> None: 

1911 """Exit context manager.""" 

1912 self.close() 

1913 

1914 def __eq__(self, other: object) -> bool: 

1915 """Check equality with another object.""" 

1916 if isinstance(other, PackData): 

1917 return self.get_stored_checksum() == other.get_stored_checksum() 

1918 return False 

1919 

1920 def _get_size(self) -> int: 

1921 if self._size is not None: 

1922 return self._size 

1923 self._size = os.path.getsize(self._filename) 

1924 if self._size < self._header_size: 

1925 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1926 raise AssertionError(errmsg) 

1927 return self._size 

1928 

1929 def __len__(self) -> int: 

1930 """Returns the number of objects in this pack.""" 

1931 return self._num_objects 

1932 

1933 def calculate_checksum(self) -> bytes: 

1934 """Calculate the checksum for this pack. 

1935 

1936 Returns: Binary digest (size depends on hash algorithm) 

1937 """ 

1938 return compute_file_sha( 

1939 self._file, 

1940 hash_func=self.object_format.hash_func, 

1941 end_ofs=-self.object_format.oid_length, 

1942 ).digest() 

1943 

1944 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1945 """Iterate over unpacked objects in the pack.""" 

1946 self._file.seek(self._header_size) 

1947 

1948 if self._num_objects is None: 

1949 return 

1950 

1951 for _ in range(self._num_objects): 

1952 offset = self._file.tell() 

1953 unpacked, unused = unpack_object( 

1954 self._file.read, 

1955 self.object_format.hash_func, 

1956 compute_crc32=False, 

1957 include_comp=include_comp, 

1958 ) 

1959 unpacked.offset = offset 

1960 yield unpacked 

1961 # Back up over unused data. 

1962 self._file.seek(-len(unused), SEEK_CUR) 

1963 

1964 def iterentries( 

1965 self, 

1966 progress: Callable[[int, int], None] | None = None, 

1967 resolve_ext_ref: ResolveExtRefFn | None = None, 

1968 ) -> Iterator[PackIndexEntry]: 

1969 """Yield entries summarizing the contents of this pack. 

1970 

1971 Args: 

1972 progress: Progress function, called with current and total 

1973 object count. 

1974 resolve_ext_ref: Optional function to resolve external references 

1975 Returns: iterator of tuples with (sha, offset, crc32) 

1976 """ 

1977 num_objects = self._num_objects 

1978 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1979 for i, result in enumerate(indexer): 

1980 if progress is not None: 

1981 progress(i, num_objects) 

1982 yield result 

1983 

1984 def sorted_entries( 

1985 self, 

1986 progress: Callable[[int, int], None] | None = None, 

1987 resolve_ext_ref: ResolveExtRefFn | None = None, 

1988 ) -> list[tuple[RawObjectID, int, int]]: 

1989 """Return entries in this pack, sorted by SHA. 

1990 

1991 Args: 

1992 progress: Progress function, called with current and total 

1993 object count 

1994 resolve_ext_ref: Optional function to resolve external references 

1995 Returns: Iterator of tuples with (sha, offset, crc32) 

1996 """ 

1997 return sorted( 

1998 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore 

1999 ) 

2000 

2001 def create_index_v1( 

2002 self, 

2003 filename: str, 

2004 progress: Callable[..., None] | None = None, 

2005 resolve_ext_ref: ResolveExtRefFn | None = None, 

2006 ) -> bytes: 

2007 """Create a version 1 file for this data file. 

2008 

2009 Args: 

2010 filename: Index filename. 

2011 progress: Progress report function 

2012 resolve_ext_ref: Optional function to resolve external references 

2013 Returns: Checksum of index file 

2014 """ 

2015 entries = self.sorted_entries( 

2016 progress=progress, resolve_ext_ref=resolve_ext_ref 

2017 ) 

2018 checksum = self.calculate_checksum() 

2019 with GitFile(filename, "wb") as f: 

2020 write_pack_index_v1( 

2021 f, 

2022 entries, 

2023 checksum, 

2024 ) 

2025 return checksum 

2026 

2027 def create_index_v2( 

2028 self, 

2029 filename: str, 

2030 progress: Callable[..., None] | None = None, 

2031 resolve_ext_ref: ResolveExtRefFn | None = None, 

2032 ) -> bytes: 

2033 """Create a version 2 index file for this data file. 

2034 

2035 Args: 

2036 filename: Index filename. 

2037 progress: Progress report function 

2038 resolve_ext_ref: Optional function to resolve external references 

2039 Returns: Checksum of index file 

2040 """ 

2041 entries = self.sorted_entries( 

2042 progress=progress, resolve_ext_ref=resolve_ext_ref 

2043 ) 

2044 with GitFile(filename, "wb") as f: 

2045 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

2046 

2047 def create_index_v3( 

2048 self, 

2049 filename: str, 

2050 progress: Callable[..., None] | None = None, 

2051 resolve_ext_ref: ResolveExtRefFn | None = None, 

2052 hash_format: int | None = None, 

2053 ) -> bytes: 

2054 """Create a version 3 index file for this data file. 

2055 

2056 Args: 

2057 filename: Index filename. 

2058 progress: Progress report function 

2059 resolve_ext_ref: Function to resolve external references 

2060 hash_format: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

2061 Returns: Checksum of index file 

2062 """ 

2063 entries = self.sorted_entries( 

2064 progress=progress, resolve_ext_ref=resolve_ext_ref 

2065 ) 

2066 with GitFile(filename, "wb") as f: 

2067 if hash_format is None: 

2068 hash_format = 1 # Default to SHA-1 

2069 return write_pack_index_v3( 

2070 f, entries, self.calculate_checksum(), hash_format=hash_format 

2071 ) 

2072 

2073 def create_index( 

2074 self, 

2075 filename: str, 

2076 progress: Callable[..., None] | None = None, 

2077 version: int = 2, 

2078 resolve_ext_ref: ResolveExtRefFn | None = None, 

2079 hash_format: int | None = None, 

2080 ) -> bytes: 

2081 """Create an index file for this data file. 

2082 

2083 Args: 

2084 filename: Index filename. 

2085 progress: Progress report function 

2086 version: Index version (1, 2, or 3) 

2087 resolve_ext_ref: Function to resolve external references 

2088 hash_format: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

2089 Returns: Checksum of index file 

2090 """ 

2091 if version == 1: 

2092 return self.create_index_v1( 

2093 filename, progress, resolve_ext_ref=resolve_ext_ref 

2094 ) 

2095 elif version == 2: 

2096 return self.create_index_v2( 

2097 filename, progress, resolve_ext_ref=resolve_ext_ref 

2098 ) 

2099 elif version == 3: 

2100 return self.create_index_v3( 

2101 filename, 

2102 progress, 

2103 resolve_ext_ref=resolve_ext_ref, 

2104 hash_format=hash_format, 

2105 ) 

2106 else: 

2107 raise ValueError(f"unknown index format {version}") 

2108 

2109 def get_stored_checksum(self) -> bytes: 

2110 """Return the expected checksum stored in this pack.""" 

2111 checksum_size = self.object_format.oid_length 

2112 self._file.seek(-checksum_size, SEEK_END) 

2113 return self._file.read(checksum_size) 

2114 

2115 def check(self) -> None: 

2116 """Check the consistency of this pack.""" 

2117 actual = self.calculate_checksum() 

2118 stored = self.get_stored_checksum() 

2119 if actual != stored: 

2120 raise ChecksumMismatch(stored, actual) 

2121 

2122 def get_unpacked_object_at( 

2123 self, offset: int, *, include_comp: bool = False 

2124 ) -> UnpackedObject: 

2125 """Given offset in the packfile return a UnpackedObject.""" 

2126 assert offset >= self._header_size 

2127 self._file.seek(offset) 

2128 unpacked, _ = unpack_object( 

2129 self._file.read, self.object_format.hash_func, include_comp=include_comp 

2130 ) 

2131 unpacked.offset = offset 

2132 return unpacked 

2133 

2134 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

2135 """Given an offset in to the packfile return the object that is there. 

2136 

2137 Using the associated index the location of an object can be looked up, 

2138 and then the packfile can be asked directly for that object using this 

2139 function. 

2140 """ 

2141 try: 

2142 return self._offset_cache[offset] 

2143 except KeyError: 

2144 pass 

2145 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

2146 return (unpacked.pack_type_num, unpacked._obj()) 

2147 

2148 

2149T = TypeVar("T") 

2150 

2151 

2152class DeltaChainIterator(Generic[T]): 

2153 """Abstract iterator over pack data based on delta chains. 

2154 

2155 Each object in the pack is guaranteed to be inflated exactly once, 

2156 regardless of how many objects reference it as a delta base. As a result, 

2157 memory usage is proportional to the length of the longest delta chain. 

2158 

2159 Subclasses can override _result to define the result type of the iterator. 

2160 By default, results are UnpackedObjects with the following members set: 

2161 

2162 * offset 

2163 * obj_type_num 

2164 * obj_chunks 

2165 * pack_type_num 

2166 * delta_base (for delta types) 

2167 * comp_chunks (if _include_comp is True) 

2168 * decomp_chunks 

2169 * decomp_len 

2170 * crc32 (if _compute_crc32 is True) 

2171 """ 

2172 

2173 _compute_crc32 = False 

2174 _include_comp = False 

2175 

2176 def __init__( 

2177 self, 

2178 file_obj: IO[bytes] | None, 

2179 hash_func: Callable[[], "HashObject"], 

2180 *, 

2181 resolve_ext_ref: ResolveExtRefFn | None = None, 

2182 ) -> None: 

2183 """Initialize DeltaChainIterator. 

2184 

2185 Args: 

2186 file_obj: File object to read pack data from 

2187 hash_func: Hash function to use for computing object IDs 

2188 resolve_ext_ref: Optional function to resolve external references 

2189 """ 

2190 self._file = file_obj 

2191 self.hash_func = hash_func 

2192 self._resolve_ext_ref = resolve_ext_ref 

2193 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

2194 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

2195 self._full_ofs: list[tuple[int, int]] = [] 

2196 self._ext_refs: list[RawObjectID] = [] 

2197 

2198 @classmethod 

2199 def for_pack_data( 

2200 cls, pack_data: PackData, resolve_ext_ref: ResolveExtRefFn | None = None 

2201 ) -> "DeltaChainIterator[T]": 

2202 """Create a DeltaChainIterator from pack data. 

2203 

2204 Args: 

2205 pack_data: PackData object to iterate 

2206 resolve_ext_ref: Optional function to resolve external refs 

2207 

2208 Returns: 

2209 DeltaChainIterator instance 

2210 """ 

2211 walker = cls( 

2212 None, pack_data.object_format.hash_func, resolve_ext_ref=resolve_ext_ref 

2213 ) 

2214 walker.set_pack_data(pack_data) 

2215 for unpacked in pack_data.iter_unpacked(include_comp=False): 

2216 walker.record(unpacked) 

2217 return walker 

2218 

2219 @classmethod 

2220 def for_pack_subset( 

2221 cls, 

2222 pack: "Pack", 

2223 shas: Iterable[ObjectID | RawObjectID], 

2224 *, 

2225 allow_missing: bool = False, 

2226 resolve_ext_ref: ResolveExtRefFn | None = None, 

2227 ) -> "DeltaChainIterator[T]": 

2228 """Create a DeltaChainIterator for a subset of objects. 

2229 

2230 Args: 

2231 pack: Pack object containing the data 

2232 shas: Iterable of object SHAs to include 

2233 allow_missing: If True, skip missing objects 

2234 resolve_ext_ref: Optional function to resolve external refs 

2235 

2236 Returns: 

2237 DeltaChainIterator instance 

2238 """ 

2239 walker = cls( 

2240 None, pack.object_format.hash_func, resolve_ext_ref=resolve_ext_ref 

2241 ) 

2242 walker.set_pack_data(pack.data) 

2243 todo = set() 

2244 for sha in shas: 

2245 try: 

2246 off = pack.index.object_offset(sha) 

2247 except KeyError: 

2248 if not allow_missing: 

2249 raise 

2250 else: 

2251 todo.add(off) 

2252 done = set() 

2253 while todo: 

2254 off = todo.pop() 

2255 unpacked = pack.data.get_unpacked_object_at(off) 

2256 walker.record(unpacked) 

2257 done.add(off) 

2258 base_ofs = None 

2259 if unpacked.pack_type_num == OFS_DELTA: 

2260 assert unpacked.offset is not None 

2261 assert unpacked.delta_base is not None 

2262 assert isinstance(unpacked.delta_base, int) 

2263 base_ofs = unpacked.offset - unpacked.delta_base 

2264 elif unpacked.pack_type_num == REF_DELTA: 

2265 with suppress(KeyError): 

2266 assert isinstance(unpacked.delta_base, bytes) 

2267 base_ofs = pack.index.object_offset( 

2268 RawObjectID(unpacked.delta_base) 

2269 ) 

2270 if base_ofs is not None and base_ofs not in done: 

2271 todo.add(base_ofs) 

2272 return walker 

2273 

2274 def record(self, unpacked: UnpackedObject) -> None: 

2275 """Record an unpacked object for later processing. 

2276 

2277 Args: 

2278 unpacked: UnpackedObject to record 

2279 """ 

2280 type_num = unpacked.pack_type_num 

2281 offset = unpacked.offset 

2282 assert offset is not None 

2283 if type_num == OFS_DELTA: 

2284 assert unpacked.delta_base is not None 

2285 assert isinstance(unpacked.delta_base, int) 

2286 base_offset = offset - unpacked.delta_base 

2287 self._pending_ofs[base_offset].append(offset) 

2288 elif type_num == REF_DELTA: 

2289 assert isinstance(unpacked.delta_base, bytes) 

2290 self._pending_ref[unpacked.delta_base].append(offset) 

2291 else: 

2292 self._full_ofs.append((offset, type_num)) 

2293 

2294 def set_pack_data(self, pack_data: PackData) -> None: 

2295 """Set the pack data for iteration. 

2296 

2297 Args: 

2298 pack_data: PackData object to use 

2299 """ 

2300 self._file = pack_data._file 

2301 

2302 def _walk_all_chains(self) -> Iterator[T]: 

2303 for offset, type_num in self._full_ofs: 

2304 yield from self._follow_chain(offset, type_num, None) 

2305 yield from self._walk_ref_chains() 

2306 assert not self._pending_ofs, repr(self._pending_ofs) 

2307 

2308 def _ensure_no_pending(self) -> None: 

2309 if self._pending_ref: 

2310 raise UnresolvedDeltas( 

2311 [sha_to_hex(RawObjectID(s)) for s in self._pending_ref] 

2312 ) 

2313 

2314 def _walk_ref_chains(self) -> Iterator[T]: 

2315 if not self._resolve_ext_ref: 

2316 self._ensure_no_pending() 

2317 return 

2318 

2319 for base_sha, pending in sorted(self._pending_ref.items()): 

2320 if base_sha not in self._pending_ref: 

2321 continue 

2322 try: 

2323 type_num, chunks = self._resolve_ext_ref(base_sha) 

2324 except KeyError: 

2325 # Not an external ref, but may depend on one. Either it will 

2326 # get popped via a _follow_chain call, or we will raise an 

2327 # error below. 

2328 continue 

2329 self._ext_refs.append(RawObjectID(base_sha)) 

2330 self._pending_ref.pop(base_sha) 

2331 for new_offset in pending: 

2332 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type] 

2333 

2334 self._ensure_no_pending() 

2335 

2336 def _result(self, unpacked: UnpackedObject) -> T: 

2337 raise NotImplementedError 

2338 

2339 def _resolve_object( 

2340 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None 

2341 ) -> UnpackedObject: 

2342 assert self._file is not None 

2343 self._file.seek(offset) 

2344 unpacked, _ = unpack_object( 

2345 self._file.read, 

2346 self.hash_func, 

2347 read_some=None, 

2348 compute_crc32=self._compute_crc32, 

2349 include_comp=self._include_comp, 

2350 ) 

2351 unpacked.offset = offset 

2352 if base_chunks is None: 

2353 assert unpacked.pack_type_num == obj_type_num 

2354 else: 

2355 assert unpacked.pack_type_num in DELTA_TYPES 

2356 unpacked.obj_type_num = obj_type_num 

2357 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2358 return unpacked 

2359 

2360 def _follow_chain( 

2361 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None 

2362 ) -> Iterator[T]: 

2363 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2364 # this approach by design inflates each object exactly once. 

2365 todo = [(offset, obj_type_num, base_chunks)] 

2366 while todo: 

2367 (offset, obj_type_num, base_chunks) = todo.pop() 

2368 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2369 yield self._result(unpacked) 

2370 

2371 assert unpacked.offset is not None 

2372 unblocked = chain( 

2373 self._pending_ofs.pop(unpacked.offset, []), 

2374 self._pending_ref.pop(unpacked.sha(), []), 

2375 ) 

2376 todo.extend( 

2377 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2378 for new_offset in unblocked 

2379 ) 

2380 

2381 def __iter__(self) -> Iterator[T]: 

2382 """Iterate over objects in the pack.""" 

2383 return self._walk_all_chains() 

2384 

2385 def ext_refs(self) -> list[RawObjectID]: 

2386 """Return external references.""" 

2387 return self._ext_refs 

2388 

2389 

2390class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2391 """Delta chain iterator that yield unpacked objects.""" 

2392 

2393 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2394 """Return the unpacked object. 

2395 

2396 Args: 

2397 unpacked: The unpacked object 

2398 

2399 Returns: 

2400 The unpacked object unchanged 

2401 """ 

2402 return unpacked 

2403 

2404 

2405class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2406 """Delta chain iterator that yields index entries.""" 

2407 

2408 _compute_crc32 = True 

2409 

2410 def _result(self, unpacked: UnpackedObject) -> PackIndexEntry: 

2411 """Convert unpacked object to pack index entry. 

2412 

2413 Args: 

2414 unpacked: The unpacked object 

2415 

2416 Returns: 

2417 Tuple of (sha, offset, crc32) for index entry 

2418 """ 

2419 assert unpacked.offset is not None 

2420 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2421 

2422 

2423class PackInflater(DeltaChainIterator[ShaFile]): 

2424 """Delta chain iterator that yields ShaFile objects.""" 

2425 

2426 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2427 """Convert unpacked object to ShaFile. 

2428 

2429 Args: 

2430 unpacked: The unpacked object 

2431 

2432 Returns: 

2433 ShaFile object from the unpacked data 

2434 """ 

2435 return unpacked.sha_file() 

2436 

2437 

2438class SHA1Reader(BinaryIO): 

2439 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2440 

2441 def __init__(self, f: IO[bytes]) -> None: 

2442 """Initialize SHA1Reader. 

2443 

2444 Args: 

2445 f: File-like object to wrap 

2446 """ 

2447 self.f = f 

2448 self.sha1 = sha1(b"") 

2449 

2450 def read(self, size: int = -1) -> bytes: 

2451 """Read bytes and update SHA1. 

2452 

2453 Args: 

2454 size: Number of bytes to read, -1 for all 

2455 

2456 Returns: 

2457 Bytes read from file 

2458 """ 

2459 data = self.f.read(size) 

2460 self.sha1.update(data) 

2461 return data 

2462 

2463 def check_sha(self, allow_empty: bool = False) -> None: 

2464 """Check if the SHA1 matches the expected value. 

2465 

2466 Args: 

2467 allow_empty: Allow empty SHA1 hash 

2468 

2469 Raises: 

2470 ChecksumMismatch: If SHA1 doesn't match 

2471 """ 

2472 stored = self.f.read(20) 

2473 # If git option index.skipHash is set the index will be empty 

2474 if stored != self.sha1.digest() and ( 

2475 not allow_empty 

2476 or ( 

2477 len(stored) == 20 

2478 and sha_to_hex(RawObjectID(stored)) 

2479 != b"0000000000000000000000000000000000000000" 

2480 ) 

2481 ): 

2482 raise ChecksumMismatch( 

2483 self.sha1.hexdigest(), 

2484 sha_to_hex(RawObjectID(stored)) if stored else b"", 

2485 ) 

2486 

2487 def close(self) -> None: 

2488 """Close the underlying file.""" 

2489 return self.f.close() 

2490 

2491 def tell(self) -> int: 

2492 """Return current file position.""" 

2493 return self.f.tell() 

2494 

2495 # BinaryIO abstract methods 

2496 def readable(self) -> bool: 

2497 """Check if file is readable.""" 

2498 return True 

2499 

2500 def writable(self) -> bool: 

2501 """Check if file is writable.""" 

2502 return False 

2503 

2504 def seekable(self) -> bool: 

2505 """Check if file is seekable.""" 

2506 return getattr(self.f, "seekable", lambda: False)() 

2507 

2508 def seek(self, offset: int, whence: int = 0) -> int: 

2509 """Seek to position in file. 

2510 

2511 Args: 

2512 offset: Position offset 

2513 whence: Reference point (0=start, 1=current, 2=end) 

2514 

2515 Returns: 

2516 New file position 

2517 """ 

2518 return self.f.seek(offset, whence) 

2519 

2520 def flush(self) -> None: 

2521 """Flush the file buffer.""" 

2522 if hasattr(self.f, "flush"): 

2523 self.f.flush() 

2524 

2525 def readline(self, size: int = -1) -> bytes: 

2526 """Read a line from the file. 

2527 

2528 Args: 

2529 size: Maximum bytes to read 

2530 

2531 Returns: 

2532 Line read from file 

2533 """ 

2534 return self.f.readline(size) 

2535 

2536 def readlines(self, hint: int = -1) -> list[bytes]: 

2537 """Read all lines from the file. 

2538 

2539 Args: 

2540 hint: Approximate number of bytes to read 

2541 

2542 Returns: 

2543 List of lines 

2544 """ 

2545 return self.f.readlines(hint) 

2546 

2547 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2548 """Write multiple lines to the file (not supported).""" 

2549 raise UnsupportedOperation("writelines") 

2550 

2551 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2552 """Write data to the file (not supported).""" 

2553 raise UnsupportedOperation("write") 

2554 

2555 def __enter__(self) -> "SHA1Reader": 

2556 """Enter context manager.""" 

2557 return self 

2558 

2559 def __exit__( 

2560 self, 

2561 type: type | None, 

2562 value: BaseException | None, 

2563 traceback: TracebackType | None, 

2564 ) -> None: 

2565 """Exit context manager and close file.""" 

2566 self.close() 

2567 

2568 def __iter__(self) -> "SHA1Reader": 

2569 """Return iterator for reading file lines.""" 

2570 return self 

2571 

2572 def __next__(self) -> bytes: 

2573 """Get next line from file. 

2574 

2575 Returns: 

2576 Next line 

2577 

2578 Raises: 

2579 StopIteration: When no more lines 

2580 """ 

2581 line = self.readline() 

2582 if not line: 

2583 raise StopIteration 

2584 return line 

2585 

2586 def fileno(self) -> int: 

2587 """Return file descriptor number.""" 

2588 return self.f.fileno() 

2589 

2590 def isatty(self) -> bool: 

2591 """Check if file is a terminal.""" 

2592 return getattr(self.f, "isatty", lambda: False)() 

2593 

2594 def truncate(self, size: int | None = None) -> int: 

2595 """Not supported for read-only file. 

2596 

2597 Raises: 

2598 UnsupportedOperation: Always raised 

2599 """ 

2600 raise UnsupportedOperation("truncate") 

2601 

2602 

2603class SHA1Writer(BinaryIO): 

2604 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2605 

2606 def __init__(self, f: BinaryIO | IO[bytes]) -> None: 

2607 """Initialize SHA1Writer. 

2608 

2609 Args: 

2610 f: File-like object to wrap 

2611 """ 

2612 self.f = f 

2613 self.length = 0 

2614 self.sha1 = sha1(b"") 

2615 self.digest: bytes | None = None 

2616 

2617 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2618 """Write data and update SHA1. 

2619 

2620 Args: 

2621 data: Data to write 

2622 

2623 Returns: 

2624 Number of bytes written 

2625 """ 

2626 self.sha1.update(data) 

2627 written = self.f.write(data) 

2628 self.length += written 

2629 return written 

2630 

2631 def write_sha(self) -> bytes: 

2632 """Write the SHA1 digest to the file. 

2633 

2634 Returns: 

2635 The SHA1 digest bytes 

2636 """ 

2637 sha = self.sha1.digest() 

2638 assert len(sha) == 20 

2639 self.f.write(sha) 

2640 self.length += len(sha) 

2641 return sha 

2642 

2643 def close(self) -> None: 

2644 """Close the pack file and finalize the SHA.""" 

2645 self.digest = self.write_sha() 

2646 self.f.close() 

2647 

2648 def offset(self) -> int: 

2649 """Get the total number of bytes written. 

2650 

2651 Returns: 

2652 Total bytes written 

2653 """ 

2654 return self.length 

2655 

2656 def tell(self) -> int: 

2657 """Return current file position.""" 

2658 return self.f.tell() 

2659 

2660 # BinaryIO abstract methods 

2661 def readable(self) -> bool: 

2662 """Check if file is readable.""" 

2663 return False 

2664 

2665 def writable(self) -> bool: 

2666 """Check if file is writable.""" 

2667 return True 

2668 

2669 def seekable(self) -> bool: 

2670 """Check if file is seekable.""" 

2671 return getattr(self.f, "seekable", lambda: False)() 

2672 

2673 def seek(self, offset: int, whence: int = 0) -> int: 

2674 """Seek to position in file. 

2675 

2676 Args: 

2677 offset: Position offset 

2678 whence: Reference point (0=start, 1=current, 2=end) 

2679 

2680 Returns: 

2681 New file position 

2682 """ 

2683 return self.f.seek(offset, whence) 

2684 

2685 def flush(self) -> None: 

2686 """Flush the file buffer.""" 

2687 if hasattr(self.f, "flush"): 

2688 self.f.flush() 

2689 

2690 def readline(self, size: int = -1) -> bytes: 

2691 """Not supported for write-only file. 

2692 

2693 Raises: 

2694 UnsupportedOperation: Always raised 

2695 """ 

2696 raise UnsupportedOperation("readline") 

2697 

2698 def readlines(self, hint: int = -1) -> list[bytes]: 

2699 """Not supported for write-only file. 

2700 

2701 Raises: 

2702 UnsupportedOperation: Always raised 

2703 """ 

2704 raise UnsupportedOperation("readlines") 

2705 

2706 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2707 """Write multiple lines to the file. 

2708 

2709 Args: 

2710 lines: Iterable of lines to write 

2711 """ 

2712 for line in lines: 

2713 self.write(line) 

2714 

2715 def read(self, size: int = -1) -> bytes: 

2716 """Not supported for write-only file. 

2717 

2718 Raises: 

2719 UnsupportedOperation: Always raised 

2720 """ 

2721 raise UnsupportedOperation("read") 

2722 

2723 def __enter__(self) -> "SHA1Writer": 

2724 """Enter context manager.""" 

2725 return self 

2726 

2727 def __exit__( 

2728 self, 

2729 type: type | None, 

2730 value: BaseException | None, 

2731 traceback: TracebackType | None, 

2732 ) -> None: 

2733 """Exit context manager and close file.""" 

2734 self.f.close() 

2735 

2736 def __iter__(self) -> "SHA1Writer": 

2737 """Return iterator.""" 

2738 return self 

2739 

2740 def __next__(self) -> bytes: 

2741 """Not supported for write-only file. 

2742 

2743 Raises: 

2744 UnsupportedOperation: Always raised 

2745 """ 

2746 raise UnsupportedOperation("__next__") 

2747 

2748 def fileno(self) -> int: 

2749 """Return file descriptor number.""" 

2750 return self.f.fileno() 

2751 

2752 def isatty(self) -> bool: 

2753 """Check if file is a terminal.""" 

2754 return getattr(self.f, "isatty", lambda: False)() 

2755 

2756 def truncate(self, size: int | None = None) -> int: 

2757 """Not supported for write-only file. 

2758 

2759 Raises: 

2760 UnsupportedOperation: Always raised 

2761 """ 

2762 raise UnsupportedOperation("truncate") 

2763 

2764 

2765class HashWriter(BinaryIO): 

2766 """Wrapper for file-like object that computes hash of its data. 

2767 

2768 This is a generic version that works with any hash algorithm. 

2769 """ 

2770 

2771 def __init__( 

2772 self, f: BinaryIO | IO[bytes], hash_func: Callable[[], "HashObject"] 

2773 ) -> None: 

2774 """Initialize HashWriter. 

2775 

2776 Args: 

2777 f: File-like object to wrap 

2778 hash_func: Hash function (e.g., sha1, sha256) 

2779 """ 

2780 self.f = f 

2781 self.length = 0 

2782 self.hash_obj = hash_func() 

2783 self.digest: bytes | None = None 

2784 

2785 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override] 

2786 """Write data and update hash. 

2787 

2788 Args: 

2789 data: Data to write 

2790 

2791 Returns: 

2792 Number of bytes written 

2793 """ 

2794 self.hash_obj.update(data) 

2795 written = self.f.write(data) 

2796 self.length += written 

2797 return written 

2798 

2799 def write_hash(self) -> bytes: 

2800 """Write the hash digest to the file. 

2801 

2802 Returns: 

2803 The hash digest bytes 

2804 """ 

2805 digest = self.hash_obj.digest() 

2806 self.f.write(digest) 

2807 self.length += len(digest) 

2808 return digest 

2809 

2810 def close(self) -> None: 

2811 """Close the pack file and finalize the hash.""" 

2812 self.digest = self.write_hash() 

2813 self.f.close() 

2814 

2815 def offset(self) -> int: 

2816 """Get the total number of bytes written. 

2817 

2818 Returns: 

2819 Total bytes written 

2820 """ 

2821 return self.length 

2822 

2823 def tell(self) -> int: 

2824 """Return current file position.""" 

2825 return self.f.tell() 

2826 

2827 # BinaryIO abstract methods 

2828 def readable(self) -> bool: 

2829 """Check if file is readable.""" 

2830 return False 

2831 

2832 def writable(self) -> bool: 

2833 """Check if file is writable.""" 

2834 return True 

2835 

2836 def seekable(self) -> bool: 

2837 """Check if file is seekable.""" 

2838 return getattr(self.f, "seekable", lambda: False)() 

2839 

2840 def seek(self, offset: int, whence: int = 0) -> int: 

2841 """Seek to position in file. 

2842 

2843 Args: 

2844 offset: Position offset 

2845 whence: Reference point (0=start, 1=current, 2=end) 

2846 

2847 Returns: 

2848 New file position 

2849 """ 

2850 return self.f.seek(offset, whence) 

2851 

2852 def flush(self) -> None: 

2853 """Flush the file buffer.""" 

2854 if hasattr(self.f, "flush"): 

2855 self.f.flush() 

2856 

2857 def readline(self, size: int = -1) -> bytes: 

2858 """Not supported for write-only file. 

2859 

2860 Raises: 

2861 UnsupportedOperation: Always raised 

2862 """ 

2863 raise UnsupportedOperation("readline") 

2864 

2865 def readlines(self, hint: int = -1) -> list[bytes]: 

2866 """Not supported for write-only file. 

2867 

2868 Raises: 

2869 UnsupportedOperation: Always raised 

2870 """ 

2871 raise UnsupportedOperation("readlines") 

2872 

2873 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2874 """Write multiple lines to the file. 

2875 

2876 Args: 

2877 lines: Iterable of lines to write 

2878 """ 

2879 for line in lines: 

2880 self.write(line) 

2881 

2882 def read(self, size: int = -1) -> bytes: 

2883 """Not supported for write-only file. 

2884 

2885 Raises: 

2886 UnsupportedOperation: Always raised 

2887 """ 

2888 raise UnsupportedOperation("read") 

2889 

2890 def __enter__(self) -> "HashWriter": 

2891 """Enter context manager.""" 

2892 return self 

2893 

2894 def __exit__( 

2895 self, 

2896 type: type | None, 

2897 value: BaseException | None, 

2898 traceback: TracebackType | None, 

2899 ) -> None: 

2900 """Exit context manager and close file.""" 

2901 self.close() 

2902 

2903 def __iter__(self) -> "HashWriter": 

2904 """Return iterator.""" 

2905 return self 

2906 

2907 def __next__(self) -> bytes: 

2908 """Not supported for write-only file. 

2909 

2910 Raises: 

2911 UnsupportedOperation: Always raised 

2912 """ 

2913 raise UnsupportedOperation("__next__") 

2914 

2915 def fileno(self) -> int: 

2916 """Return file descriptor number.""" 

2917 return self.f.fileno() 

2918 

2919 def isatty(self) -> bool: 

2920 """Check if file is a terminal.""" 

2921 return getattr(self.f, "isatty", lambda: False)() 

2922 

2923 def truncate(self, size: int | None = None) -> int: 

2924 """Not supported for write-only file. 

2925 

2926 Raises: 

2927 UnsupportedOperation: Always raised 

2928 """ 

2929 raise UnsupportedOperation("truncate") 

2930 

2931 

2932def pack_object_header( 

2933 type_num: int, 

2934 delta_base: bytes | int | None, 

2935 size: int, 

2936 object_format: "ObjectFormat", 

2937) -> bytearray: 

2938 """Create a pack object header for the given object info. 

2939 

2940 Args: 

2941 type_num: Numeric type of the object. 

2942 delta_base: Delta base offset or ref, or None for whole objects. 

2943 size: Uncompressed object size. 

2944 object_format: Object format (hash algorithm) to use. 

2945 Returns: A header for a packed object. 

2946 """ 

2947 header = [] 

2948 c = (type_num << 4) | (size & 15) 

2949 size >>= 4 

2950 while size: 

2951 header.append(c | 0x80) 

2952 c = size & 0x7F 

2953 size >>= 7 

2954 header.append(c) 

2955 if type_num == OFS_DELTA: 

2956 assert isinstance(delta_base, int) 

2957 ret = [delta_base & 0x7F] 

2958 delta_base >>= 7 

2959 while delta_base: 

2960 delta_base -= 1 

2961 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2962 delta_base >>= 7 

2963 header.extend(ret) 

2964 elif type_num == REF_DELTA: 

2965 assert isinstance(delta_base, bytes) 

2966 assert len(delta_base) == object_format.oid_length 

2967 header += delta_base 

2968 return bytearray(header) 

2969 

2970 

2971def pack_object_chunks( 

2972 type: int, 

2973 object: list[bytes] | tuple[bytes | int, list[bytes]], 

2974 object_format: "ObjectFormat", 

2975 *, 

2976 compression_level: int = -1, 

2977) -> Iterator[bytes]: 

2978 """Generate chunks for a pack object. 

2979 

2980 Args: 

2981 type: Numeric type of the object 

2982 object: Object to write 

2983 object_format: Object format (hash algorithm) to use 

2984 compression_level: the zlib compression level 

2985 Returns: Chunks 

2986 """ 

2987 if type in DELTA_TYPES: 

2988 if isinstance(object, tuple): 

2989 delta_base, object = object 

2990 else: 

2991 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2992 else: 

2993 delta_base = None 

2994 

2995 # Convert object to list of bytes chunks 

2996 if isinstance(object, bytes): 

2997 chunks = [object] 

2998 elif isinstance(object, list): 

2999 chunks = object 

3000 elif isinstance(object, ShaFile): 

3001 chunks = object.as_raw_chunks() 

3002 else: 

3003 # Shouldn't reach here with proper typing 

3004 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

3005 

3006 yield bytes( 

3007 pack_object_header( 

3008 type, delta_base, sum(map(len, chunks)), object_format=object_format 

3009 ) 

3010 ) 

3011 compressor = zlib.compressobj(level=compression_level) 

3012 for data in chunks: 

3013 yield compressor.compress(data) 

3014 yield compressor.flush() 

3015 

3016 

3017def write_pack_object( 

3018 write: Callable[[bytes], int], 

3019 type: int, 

3020 object: list[bytes] | tuple[bytes | int, list[bytes]], 

3021 object_format: "ObjectFormat", 

3022 *, 

3023 sha: "HashObject | None" = None, 

3024 compression_level: int = -1, 

3025) -> int: 

3026 """Write pack object to a file. 

3027 

3028 Args: 

3029 write: Write function to use 

3030 type: Numeric type of the object 

3031 object: Object to write 

3032 object_format: Object format (hash algorithm) to use 

3033 sha: Optional SHA-1 hasher to update 

3034 compression_level: the zlib compression level 

3035 Returns: CRC32 checksum of the written object 

3036 """ 

3037 crc32 = 0 

3038 for chunk in pack_object_chunks( 

3039 type, object, compression_level=compression_level, object_format=object_format 

3040 ): 

3041 write(chunk) 

3042 if sha is not None: 

3043 sha.update(chunk) 

3044 crc32 = binascii.crc32(chunk, crc32) 

3045 return crc32 & 0xFFFFFFFF 

3046 

3047 

3048def write_pack( 

3049 filename: str, 

3050 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

3051 object_format: "ObjectFormat", 

3052 *, 

3053 deltify: bool | None = None, 

3054 delta_window_size: int | None = None, 

3055 compression_level: int = -1, 

3056) -> tuple[bytes, bytes]: 

3057 """Write a new pack data file. 

3058 

3059 Args: 

3060 filename: Path to the new pack file (without .pack extension) 

3061 objects: Objects to write to the pack 

3062 object_format: Object format 

3063 delta_window_size: Delta window size 

3064 deltify: Whether to deltify pack objects 

3065 compression_level: the zlib compression level 

3066 Returns: Tuple with checksum of pack file and index file 

3067 """ 

3068 with GitFile(filename + ".pack", "wb") as f: 

3069 entries, data_sum = write_pack_objects( 

3070 f, 

3071 objects, 

3072 delta_window_size=delta_window_size, 

3073 deltify=deltify, 

3074 compression_level=compression_level, 

3075 object_format=object_format, 

3076 ) 

3077 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

3078 with GitFile(filename + ".idx", "wb") as f: 

3079 idx_sha = write_pack_index(f, entries_list, data_sum) 

3080 return data_sum, idx_sha 

3081 

3082 

3083def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

3084 """Yield chunks for a pack header.""" 

3085 yield b"PACK" # Pack header 

3086 yield struct.pack(b">L", 2) # Pack version 

3087 yield struct.pack(b">L", num_objects) # Number of objects in pack 

3088 

3089 

3090def write_pack_header( 

3091 write: Callable[[bytes], int] | IO[bytes], num_objects: int 

3092) -> None: 

3093 """Write a pack header for the given number of objects.""" 

3094 write_fn: Callable[[bytes], int] 

3095 if hasattr(write, "write"): 

3096 write_fn = write.write 

3097 warnings.warn( 

3098 "write_pack_header() now takes a write rather than file argument", 

3099 DeprecationWarning, 

3100 stacklevel=2, 

3101 ) 

3102 else: 

3103 write_fn = write 

3104 for chunk in pack_header_chunks(num_objects): 

3105 write_fn(chunk) 

3106 

3107 

3108def find_reusable_deltas( 

3109 container: PackedObjectContainer, 

3110 object_ids: Set[ObjectID], 

3111 *, 

3112 other_haves: Set[ObjectID] | None = None, 

3113 progress: Callable[..., None] | None = None, 

3114) -> Iterator[UnpackedObject]: 

3115 """Find deltas in a pack that can be reused. 

3116 

3117 Args: 

3118 container: Pack container to search for deltas 

3119 object_ids: Set of object IDs to find deltas for 

3120 other_haves: Set of other object IDs we have 

3121 progress: Optional progress reporting callback 

3122 

3123 Returns: 

3124 Iterator of UnpackedObject entries that can be reused 

3125 """ 

3126 if other_haves is None: 

3127 other_haves = set() 

3128 reused = 0 

3129 for i, unpacked in enumerate( 

3130 container.iter_unpacked_subset( 

3131 object_ids, allow_missing=True, convert_ofs_delta=True 

3132 ) 

3133 ): 

3134 if progress is not None and i % 1000 == 0: 

3135 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

3136 if unpacked.pack_type_num == REF_DELTA: 

3137 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

3138 if hexsha in object_ids or hexsha in other_haves: 

3139 yield unpacked 

3140 reused += 1 

3141 if progress is not None: 

3142 progress((f"found {reused} deltas to reuse\n").encode()) 

3143 

3144 

3145def deltify_pack_objects( 

3146 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, bytes | None]], 

3147 *, 

3148 window_size: int | None = None, 

3149 progress: Callable[..., None] | None = None, 

3150) -> Iterator[UnpackedObject]: 

3151 """Generate deltas for pack objects. 

3152 

3153 Args: 

3154 objects: An iterable of (object, path) tuples to deltify. 

3155 window_size: Window size; None for default 

3156 progress: Optional progress reporting callback 

3157 Returns: Iterator over type_num, object id, delta_base, content 

3158 delta_base is None for full text entries 

3159 """ 

3160 

3161 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, bytes | None]]]: 

3162 for e in objects: 

3163 if isinstance(e, ShaFile): 

3164 yield (e, (e.type_num, None)) 

3165 else: 

3166 yield (e[0], (e[0].type_num, e[1])) 

3167 

3168 sorted_objs = sort_objects_for_delta(objects_with_hints()) 

3169 yield from deltas_from_sorted_objects( 

3170 sorted_objs, 

3171 window_size=window_size, 

3172 progress=progress, 

3173 ) 

3174 

3175 

3176def sort_objects_for_delta( 

3177 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, PackHint | None]], 

3178) -> Iterator[tuple[ShaFile, bytes | None]]: 

3179 """Sort objects for optimal delta compression. 

3180 

3181 Args: 

3182 objects: Iterator of objects or (object, hint) tuples 

3183 

3184 Returns: 

3185 Iterator of sorted (ShaFile, path) tuples 

3186 """ 

3187 magic = [] 

3188 for entry in objects: 

3189 if isinstance(entry, tuple): 

3190 obj, hint = entry 

3191 if hint is None: 

3192 type_num = None 

3193 path = None 

3194 else: 

3195 (type_num, path) = hint 

3196 else: 

3197 obj = entry 

3198 type_num = None 

3199 path = None 

3200 magic.append((type_num, path, -obj.raw_length(), obj)) 

3201 # Build a list of objects ordered by the magic Linus heuristic 

3202 # This helps us find good objects to diff against us 

3203 magic.sort() 

3204 return ((x[3], x[1]) for x in magic) 

3205 

3206 

3207def deltas_from_sorted_objects( 

3208 objects: Iterator[tuple[ShaFile, bytes | None]], 

3209 window_size: int | None = None, 

3210 progress: Callable[..., None] | None = None, 

3211) -> Iterator[UnpackedObject]: 

3212 """Create deltas from sorted objects. 

3213 

3214 Args: 

3215 objects: Iterator of sorted objects to deltify 

3216 window_size: Delta window size; None for default 

3217 progress: Optional progress reporting callback 

3218 

3219 Returns: 

3220 Iterator of UnpackedObject entries 

3221 """ 

3222 # TODO(jelmer): Use threads 

3223 if window_size is None: 

3224 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

3225 

3226 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

3227 for i, (o, path) in enumerate(objects): 

3228 if progress is not None and i % 1000 == 0: 

3229 progress((f"generating deltas: {i}\r").encode()) 

3230 raw = o.as_raw_chunks() 

3231 winner = raw 

3232 winner_len = sum(map(len, winner)) 

3233 winner_base = None 

3234 for base_id, base_type_num, base in possible_bases: 

3235 if base_type_num != o.type_num: 

3236 continue 

3237 delta_len = 0 

3238 delta = [] 

3239 for chunk in create_delta(b"".join(base), b"".join(raw)): 

3240 delta_len += len(chunk) 

3241 if delta_len >= winner_len: 

3242 break 

3243 delta.append(chunk) 

3244 else: 

3245 winner_base = base_id 

3246 winner = delta 

3247 winner_len = sum(map(len, winner)) 

3248 yield UnpackedObject( 

3249 o.type_num, 

3250 sha=o.sha().digest(), 

3251 delta_base=winner_base, 

3252 decomp_len=winner_len, 

3253 decomp_chunks=winner, 

3254 ) 

3255 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

3256 while len(possible_bases) > window_size: 

3257 possible_bases.pop() 

3258 

3259 

3260def pack_objects_to_data( 

3261 objects: Sequence[ShaFile] 

3262 | Sequence[tuple[ShaFile, bytes | None]] 

3263 | Sequence[tuple[ShaFile, PackHint | None]], 

3264 *, 

3265 deltify: bool | None = None, 

3266 delta_window_size: int | None = None, 

3267 ofs_delta: bool = True, 

3268 progress: Callable[..., None] | None = None, 

3269) -> tuple[int, Iterator[UnpackedObject]]: 

3270 """Create pack data from objects. 

3271 

3272 Args: 

3273 objects: Pack objects 

3274 deltify: Whether to deltify pack objects 

3275 delta_window_size: Delta window size 

3276 ofs_delta: Whether to use offset deltas 

3277 progress: Optional progress reporting callback 

3278 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

3279 """ 

3280 count = len(objects) 

3281 if deltify is None: 

3282 # PERFORMANCE/TODO(jelmer): This should be enabled but the python 

3283 # implementation is *much* too slow at the moment. 

3284 # Maybe consider enabling it just if the rust extension is available? 

3285 deltify = False 

3286 if deltify: 

3287 return ( 

3288 count, 

3289 deltify_pack_objects( 

3290 iter(objects), # type: ignore 

3291 window_size=delta_window_size, 

3292 progress=progress, 

3293 ), 

3294 ) 

3295 else: 

3296 

3297 def iter_without_path() -> Iterator[UnpackedObject]: 

3298 for o in objects: 

3299 if isinstance(o, tuple): 

3300 yield full_unpacked_object(o[0]) 

3301 else: 

3302 yield full_unpacked_object(o) 

3303 

3304 return (count, iter_without_path()) 

3305 

3306 

3307def generate_unpacked_objects( 

3308 container: PackedObjectContainer, 

3309 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

3310 delta_window_size: int | None = None, 

3311 deltify: bool | None = None, 

3312 reuse_deltas: bool = True, 

3313 ofs_delta: bool = True, 

3314 other_haves: set[ObjectID] | None = None, 

3315 progress: Callable[..., None] | None = None, 

3316) -> Iterator[UnpackedObject]: 

3317 """Create pack data from objects. 

3318 

3319 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

3320 """ 

3321 todo = dict(object_ids) 

3322 if reuse_deltas: 

3323 for unpack in find_reusable_deltas( 

3324 container, set(todo), other_haves=other_haves, progress=progress 

3325 ): 

3326 del todo[sha_to_hex(RawObjectID(unpack.sha()))] 

3327 yield unpack 

3328 if deltify is None: 

3329 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

3330 # slow at the moment. 

3331 deltify = False 

3332 if deltify: 

3333 objects_to_delta = container.iterobjects_subset( 

3334 todo.keys(), allow_missing=False 

3335 ) 

3336 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta) 

3337 yield from deltas_from_sorted_objects( 

3338 sorted_objs, 

3339 window_size=delta_window_size, 

3340 progress=progress, 

3341 ) 

3342 else: 

3343 for oid in todo: 

3344 yield full_unpacked_object(container[oid]) 

3345 

3346 

3347def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

3348 """Create an UnpackedObject from a ShaFile. 

3349 

3350 Args: 

3351 o: ShaFile object to convert 

3352 

3353 Returns: 

3354 UnpackedObject with full object data 

3355 """ 

3356 return UnpackedObject( 

3357 o.type_num, 

3358 delta_base=None, 

3359 crc32=None, 

3360 decomp_chunks=o.as_raw_chunks(), 

3361 sha=o.sha().digest(), 

3362 ) 

3363 

3364 

3365def write_pack_from_container( 

3366 write: Callable[[bytes], None] 

3367 | Callable[[bytes | bytearray | memoryview], int] 

3368 | IO[bytes], 

3369 container: PackedObjectContainer, 

3370 object_ids: Sequence[tuple[ObjectID, PackHint | None]], 

3371 object_format: "ObjectFormat", 

3372 *, 

3373 delta_window_size: int | None = None, 

3374 deltify: bool | None = None, 

3375 reuse_deltas: bool = True, 

3376 compression_level: int = -1, 

3377 other_haves: set[ObjectID] | None = None, 

3378) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3379 """Write a new pack data file. 

3380 

3381 Args: 

3382 write: write function to use 

3383 container: PackedObjectContainer 

3384 object_ids: Sequence of (object_id, hint) tuples to write 

3385 object_format: Object format (hash algorithm) to use 

3386 delta_window_size: Sliding window size for searching for deltas; 

3387 Set to None for default window size. 

3388 deltify: Whether to deltify objects 

3389 reuse_deltas: Whether to reuse existing deltas 

3390 compression_level: the zlib compression level to use 

3391 other_haves: Set of additional object IDs the receiver has 

3392 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3393 """ 

3394 pack_contents_count = len(object_ids) 

3395 pack_contents = generate_unpacked_objects( 

3396 container, 

3397 object_ids, 

3398 delta_window_size=delta_window_size, 

3399 deltify=deltify, 

3400 reuse_deltas=reuse_deltas, 

3401 other_haves=other_haves, 

3402 ) 

3403 

3404 return write_pack_data( 

3405 write, 

3406 pack_contents, 

3407 num_records=pack_contents_count, 

3408 compression_level=compression_level, 

3409 object_format=object_format, 

3410 ) 

3411 

3412 

3413def write_pack_objects( 

3414 write: Callable[[bytes], None] | IO[bytes], 

3415 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]], 

3416 object_format: "ObjectFormat", 

3417 *, 

3418 delta_window_size: int | None = None, 

3419 deltify: bool | None = None, 

3420 compression_level: int = -1, 

3421) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3422 """Write a new pack data file. 

3423 

3424 Args: 

3425 write: write function to use 

3426 objects: Sequence of (object, path) tuples to write 

3427 object_format: Object format (hash algorithm) to use 

3428 delta_window_size: Sliding window size for searching for deltas; 

3429 Set to None for default window size. 

3430 deltify: Whether to deltify objects 

3431 compression_level: the zlib compression level to use 

3432 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3433 """ 

3434 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

3435 

3436 return write_pack_data( 

3437 write, 

3438 pack_contents, 

3439 num_records=pack_contents_count, 

3440 compression_level=compression_level, 

3441 object_format=object_format, 

3442 ) 

3443 

3444 

3445class PackChunkGenerator: 

3446 """Generator for pack data chunks.""" 

3447 

3448 def __init__( 

3449 self, 

3450 object_format: "ObjectFormat", 

3451 num_records: int | None = None, 

3452 records: Iterator[UnpackedObject] | None = None, 

3453 progress: Callable[..., None] | None = None, 

3454 compression_level: int = -1, 

3455 reuse_compressed: bool = True, 

3456 ) -> None: 

3457 """Initialize PackChunkGenerator. 

3458 

3459 Args: 

3460 num_records: Expected number of records 

3461 records: Iterator of pack records 

3462 progress: Optional progress callback 

3463 compression_level: Compression level (-1 for default) 

3464 reuse_compressed: Whether to reuse compressed chunks 

3465 object_format: Object format (hash algorithm) to use 

3466 """ 

3467 self.object_format = object_format 

3468 self.cs = object_format.new_hash() 

3469 self.entries: dict[bytes, tuple[int, int]] = {} 

3470 if records is None: 

3471 records = iter([]) # Empty iterator if None 

3472 self._it = self._pack_data_chunks( 

3473 records=records, 

3474 num_records=num_records, 

3475 progress=progress, 

3476 compression_level=compression_level, 

3477 reuse_compressed=reuse_compressed, 

3478 ) 

3479 

3480 def sha1digest(self) -> bytes: 

3481 """Return the SHA1 digest of the pack data.""" 

3482 return self.cs.digest() 

3483 

3484 def __iter__(self) -> Iterator[bytes]: 

3485 """Iterate over pack data chunks.""" 

3486 return self._it 

3487 

3488 def _pack_data_chunks( 

3489 self, 

3490 records: Iterator[UnpackedObject], 

3491 *, 

3492 num_records: int | None = None, 

3493 progress: Callable[..., None] | None = None, 

3494 compression_level: int = -1, 

3495 reuse_compressed: bool = True, 

3496 ) -> Iterator[bytes]: 

3497 """Iterate pack data file chunks. 

3498 

3499 Args: 

3500 records: Iterator over UnpackedObject 

3501 num_records: Number of records (defaults to len(records) if not specified) 

3502 progress: Function to report progress to 

3503 compression_level: the zlib compression level 

3504 reuse_compressed: Whether to reuse compressed chunks 

3505 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3506 """ 

3507 # Write the pack 

3508 if num_records is None: 

3509 num_records = len(records) # type: ignore 

3510 offset = 0 

3511 for chunk in pack_header_chunks(num_records): 

3512 yield chunk 

3513 self.cs.update(chunk) 

3514 offset += len(chunk) 

3515 actual_num_records = 0 

3516 for i, unpacked in enumerate(records): 

3517 type_num = unpacked.pack_type_num 

3518 if progress is not None and i % 1000 == 0: 

3519 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

3520 raw: list[bytes] | tuple[int, list[bytes]] | tuple[bytes, list[bytes]] 

3521 if unpacked.delta_base is not None: 

3522 assert isinstance(unpacked.delta_base, bytes), ( 

3523 f"Expected bytes, got {type(unpacked.delta_base)}" 

3524 ) 

3525 try: 

3526 base_offset, _base_crc32 = self.entries[unpacked.delta_base] 

3527 except KeyError: 

3528 type_num = REF_DELTA 

3529 assert isinstance(unpacked.delta_base, bytes) 

3530 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

3531 else: 

3532 type_num = OFS_DELTA 

3533 raw = (offset - base_offset, unpacked.decomp_chunks) 

3534 else: 

3535 raw = unpacked.decomp_chunks 

3536 chunks: list[bytes] | Iterator[bytes] 

3537 if unpacked.comp_chunks is not None and reuse_compressed: 

3538 chunks = unpacked.comp_chunks 

3539 else: 

3540 chunks = pack_object_chunks( 

3541 type_num, 

3542 raw, 

3543 compression_level=compression_level, 

3544 object_format=self.object_format, 

3545 ) 

3546 crc32 = 0 

3547 object_size = 0 

3548 for chunk in chunks: 

3549 yield chunk 

3550 crc32 = binascii.crc32(chunk, crc32) 

3551 self.cs.update(chunk) 

3552 object_size += len(chunk) 

3553 actual_num_records += 1 

3554 self.entries[unpacked.sha()] = (offset, crc32) 

3555 offset += object_size 

3556 if actual_num_records != num_records: 

3557 raise AssertionError( 

3558 f"actual records written differs: {actual_num_records} != {num_records}" 

3559 ) 

3560 

3561 yield self.cs.digest() 

3562 

3563 

3564def write_pack_data( 

3565 write: Callable[[bytes], None] 

3566 | Callable[[bytes | bytearray | memoryview], int] 

3567 | IO[bytes], 

3568 records: Iterator[UnpackedObject], 

3569 object_format: "ObjectFormat", 

3570 *, 

3571 num_records: int | None = None, 

3572 progress: Callable[..., None] | None = None, 

3573 compression_level: int = -1, 

3574) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3575 """Write a new pack data file. 

3576 

3577 Args: 

3578 write: Write function to use 

3579 num_records: Number of records (defaults to len(records) if None) 

3580 records: Iterator over type_num, object_id, delta_base, raw 

3581 object_format: Object format (hash algorithm) to use 

3582 progress: Function to report progress to 

3583 compression_level: the zlib compression level 

3584 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3585 """ 

3586 chunk_generator = PackChunkGenerator( 

3587 num_records=num_records, 

3588 records=records, 

3589 progress=progress, 

3590 compression_level=compression_level, 

3591 object_format=object_format, 

3592 ) 

3593 for chunk in chunk_generator: 

3594 if callable(write): 

3595 write(chunk) 

3596 else: 

3597 write.write(chunk) 

3598 return chunk_generator.entries, chunk_generator.sha1digest() 

3599 

3600 

3601def write_pack_index_v1( 

3602 f: IO[bytes], 

3603 entries: Iterable[tuple[bytes, int, int | None]], 

3604 pack_checksum: bytes, 

3605) -> bytes: 

3606 """Write a new pack index file. 

3607 

3608 Args: 

3609 f: A file-like object to write to 

3610 entries: List of tuples with object name (sha), offset_in_pack, 

3611 and crc32_checksum. 

3612 pack_checksum: Checksum of the pack file. 

3613 Returns: The SHA of the written index file 

3614 """ 

3615 f = SHA1Writer(f) 

3616 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3617 for name, _offset, _entry_checksum in entries: 

3618 fan_out_table[ord(name[:1])] += 1 

3619 # Fan-out table 

3620 for i in range(0x100): 

3621 f.write(struct.pack(">L", fan_out_table[i])) 

3622 fan_out_table[i + 1] += fan_out_table[i] 

3623 for name, offset, _entry_checksum in entries: 

3624 if len(name) != 20: 

3625 raise TypeError("pack index v1 only supports SHA-1 names") 

3626 if not (offset <= 0xFFFFFFFF): 

3627 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3628 f.write(struct.pack(">L20s", offset, name)) 

3629 assert len(pack_checksum) == 20 

3630 f.write(pack_checksum) 

3631 return f.write_sha() 

3632 

3633 

3634def _delta_encode_size(size: int) -> bytes: 

3635 ret = bytearray() 

3636 c = size & 0x7F 

3637 size >>= 7 

3638 while size: 

3639 ret.append(c | 0x80) 

3640 c = size & 0x7F 

3641 size >>= 7 

3642 ret.append(c) 

3643 return bytes(ret) 

3644 

3645 

3646# The length of delta compression copy operations in version 2 packs is limited 

3647# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3648# 24-bit lengths in copy operations, but we always make version 2 packs. 

3649_MAX_COPY_LEN = 0xFFFF 

3650 

3651 

3652def _encode_copy_operation(start: int, length: int) -> bytes: 

3653 scratch = bytearray([0x80]) 

3654 for i in range(4): 

3655 if start & 0xFF << i * 8: 

3656 scratch.append((start >> i * 8) & 0xFF) 

3657 scratch[0] |= 1 << i 

3658 for i in range(2): 

3659 if length & 0xFF << i * 8: 

3660 scratch.append((length >> i * 8) & 0xFF) 

3661 scratch[0] |= 1 << (4 + i) 

3662 return bytes(scratch) 

3663 

3664 

3665def _create_delta_py(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3666 """Use python difflib to work out how to transform base_buf to target_buf. 

3667 

3668 Args: 

3669 base_buf: Base buffer 

3670 target_buf: Target buffer 

3671 """ 

3672 if isinstance(base_buf, list): 

3673 base_buf = b"".join(base_buf) 

3674 if isinstance(target_buf, list): 

3675 target_buf = b"".join(target_buf) 

3676 assert isinstance(base_buf, bytes) 

3677 assert isinstance(target_buf, bytes) 

3678 # write delta header 

3679 yield _delta_encode_size(len(base_buf)) 

3680 yield _delta_encode_size(len(target_buf)) 

3681 # write out delta opcodes 

3682 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3683 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3684 # Git patch opcodes don't care about deletes! 

3685 # if opcode == 'replace' or opcode == 'delete': 

3686 # pass 

3687 if opcode == "equal": 

3688 # If they are equal, unpacker will use data from base_buf 

3689 # Write out an opcode that says what range to use 

3690 copy_start = i1 

3691 copy_len = i2 - i1 

3692 while copy_len > 0: 

3693 to_copy = min(copy_len, _MAX_COPY_LEN) 

3694 yield _encode_copy_operation(copy_start, to_copy) 

3695 copy_start += to_copy 

3696 copy_len -= to_copy 

3697 if opcode == "replace" or opcode == "insert": 

3698 # If we are replacing a range or adding one, then we just 

3699 # output it to the stream (prefixed by its size) 

3700 s = j2 - j1 

3701 o = j1 

3702 while s > 127: 

3703 yield bytes([127]) 

3704 yield bytes(memoryview(target_buf)[o : o + 127]) 

3705 s -= 127 

3706 o += 127 

3707 yield bytes([s]) 

3708 yield bytes(memoryview(target_buf)[o : o + s]) 

3709 

3710 

3711# Default to pure Python implementation 

3712create_delta = _create_delta_py 

3713 

3714 

3715def apply_delta( 

3716 src_buf: bytes | list[bytes], delta: bytes | list[bytes] 

3717) -> list[bytes]: 

3718 """Based on the similar function in git's patch-delta.c. 

3719 

3720 Args: 

3721 src_buf: Source buffer 

3722 delta: Delta instructions 

3723 """ 

3724 if not isinstance(src_buf, bytes): 

3725 src_buf = b"".join(src_buf) 

3726 if not isinstance(delta, bytes): 

3727 delta = b"".join(delta) 

3728 out = [] 

3729 index = 0 

3730 delta_length = len(delta) 

3731 

3732 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3733 size = 0 

3734 i = 0 

3735 while delta: 

3736 cmd = ord(delta[index : index + 1]) 

3737 index += 1 

3738 size |= (cmd & ~0x80) << i 

3739 i += 7 

3740 if not cmd & 0x80: 

3741 break 

3742 return size, index 

3743 

3744 src_size, index = get_delta_header_size(delta, index) 

3745 dest_size, index = get_delta_header_size(delta, index) 

3746 if src_size != len(src_buf): 

3747 raise ApplyDeltaError( 

3748 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3749 ) 

3750 while index < delta_length: 

3751 cmd = ord(delta[index : index + 1]) 

3752 index += 1 

3753 if cmd & 0x80: 

3754 cp_off = 0 

3755 for i in range(4): 

3756 if cmd & (1 << i): 

3757 x = ord(delta[index : index + 1]) 

3758 index += 1 

3759 cp_off |= x << (i * 8) 

3760 cp_size = 0 

3761 # Version 3 packs can contain copy sizes larger than 64K. 

3762 for i in range(3): 

3763 if cmd & (1 << (4 + i)): 

3764 x = ord(delta[index : index + 1]) 

3765 index += 1 

3766 cp_size |= x << (i * 8) 

3767 if cp_size == 0: 

3768 cp_size = 0x10000 

3769 if ( 

3770 cp_off + cp_size < cp_size 

3771 or cp_off + cp_size > src_size 

3772 or cp_size > dest_size 

3773 ): 

3774 break 

3775 out.append(src_buf[cp_off : cp_off + cp_size]) 

3776 elif cmd != 0: 

3777 out.append(delta[index : index + cmd]) 

3778 index += cmd 

3779 else: 

3780 raise ApplyDeltaError("Invalid opcode 0") 

3781 

3782 if index != delta_length: 

3783 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3784 

3785 if dest_size != chunks_length(out): 

3786 raise ApplyDeltaError("dest size incorrect") 

3787 

3788 return out 

3789 

3790 

3791def write_pack_index_v2( 

3792 f: IO[bytes], 

3793 entries: Iterable[tuple[bytes, int, int | None]], 

3794 pack_checksum: bytes, 

3795) -> bytes: 

3796 """Write a new pack index file. 

3797 

3798 Args: 

3799 f: File-like object to write to 

3800 entries: List of tuples with object name (sha), offset_in_pack, and 

3801 crc32_checksum. 

3802 pack_checksum: Checksum of the pack file. 

3803 Returns: The checksum of the index file written 

3804 """ 

3805 # Determine hash algorithm from pack_checksum length 

3806 if len(pack_checksum) == 20: 

3807 hash_func = sha1 

3808 elif len(pack_checksum) == 32: 

3809 hash_func = sha256 

3810 else: 

3811 raise ValueError(f"Unsupported pack checksum length: {len(pack_checksum)}") 

3812 

3813 f_writer = HashWriter(f, hash_func) 

3814 f_writer.write(b"\377tOc") # Magic! 

3815 f_writer.write(struct.pack(">L", 2)) 

3816 

3817 # Convert to list to allow multiple iterations 

3818 entries_list = list(entries) 

3819 

3820 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3821 for name, offset, entry_checksum in entries_list: 

3822 fan_out_table[ord(name[:1])] += 1 

3823 

3824 if entries_list: 

3825 hash_size = len(entries_list[0][0]) 

3826 else: 

3827 hash_size = len(pack_checksum) # Use pack_checksum length as hash size 

3828 

3829 # Fan-out table 

3830 largetable: list[int] = [] 

3831 for i in range(0x100): 

3832 f_writer.write(struct.pack(b">L", fan_out_table[i])) 

3833 fan_out_table[i + 1] += fan_out_table[i] 

3834 for name, offset, entry_checksum in entries_list: 

3835 if len(name) != hash_size: 

3836 raise TypeError( 

3837 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3838 ) 

3839 f_writer.write(name) 

3840 for name, offset, entry_checksum in entries_list: 

3841 f_writer.write(struct.pack(b">L", entry_checksum)) 

3842 for name, offset, entry_checksum in entries_list: 

3843 if offset < 2**31: 

3844 f_writer.write(struct.pack(b">L", offset)) 

3845 else: 

3846 f_writer.write(struct.pack(b">L", 2**31 + len(largetable))) 

3847 largetable.append(offset) 

3848 for offset in largetable: 

3849 f_writer.write(struct.pack(b">Q", offset)) 

3850 f_writer.write(pack_checksum) 

3851 return f_writer.write_hash() 

3852 

3853 

3854def write_pack_index_v3( 

3855 f: IO[bytes], 

3856 entries: Iterable[tuple[bytes, int, int | None]], 

3857 pack_checksum: bytes, 

3858 hash_format: int = 1, 

3859) -> bytes: 

3860 """Write a new pack index file in v3 format. 

3861 

3862 Args: 

3863 f: File-like object to write to 

3864 entries: List of tuples with object name (sha), offset_in_pack, and 

3865 crc32_checksum. 

3866 pack_checksum: Checksum of the pack file. 

3867 hash_format: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3868 Returns: The SHA of the index file written 

3869 """ 

3870 if hash_format == 1: 

3871 hash_size = 20 # SHA-1 

3872 writer_cls = SHA1Writer 

3873 elif hash_format == 2: 

3874 hash_size = 32 # SHA-256 

3875 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3876 raise NotImplementedError("SHA-256 support not yet implemented") 

3877 else: 

3878 raise ValueError(f"Unknown hash algorithm {hash_format}") 

3879 

3880 # Convert entries to list to allow multiple iterations 

3881 entries_list = list(entries) 

3882 

3883 # Calculate shortest unambiguous prefix length for object names 

3884 # For now, use full hash size (this could be optimized) 

3885 shortened_oid_len = hash_size 

3886 

3887 f = writer_cls(f) 

3888 f.write(b"\377tOc") # Magic! 

3889 f.write(struct.pack(">L", 3)) # Version 3 

3890 f.write(struct.pack(">L", hash_format)) # Hash algorithm 

3891 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3892 

3893 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3894 for name, offset, entry_checksum in entries_list: 

3895 if len(name) != hash_size: 

3896 raise ValueError( 

3897 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3898 ) 

3899 fan_out_table[ord(name[:1])] += 1 

3900 

3901 # Fan-out table 

3902 largetable: list[int] = [] 

3903 for i in range(0x100): 

3904 f.write(struct.pack(b">L", fan_out_table[i])) 

3905 fan_out_table[i + 1] += fan_out_table[i] 

3906 

3907 # Object names table 

3908 for name, offset, entry_checksum in entries_list: 

3909 f.write(name) 

3910 

3911 # CRC32 checksums table 

3912 for name, offset, entry_checksum in entries_list: 

3913 f.write(struct.pack(b">L", entry_checksum)) 

3914 

3915 # Offset table 

3916 for name, offset, entry_checksum in entries_list: 

3917 if offset < 2**31: 

3918 f.write(struct.pack(b">L", offset)) 

3919 else: 

3920 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3921 largetable.append(offset) 

3922 

3923 # Large offset table 

3924 for offset in largetable: 

3925 f.write(struct.pack(b">Q", offset)) 

3926 

3927 assert len(pack_checksum) == hash_size, ( 

3928 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3929 ) 

3930 f.write(pack_checksum) 

3931 return f.write_sha() 

3932 

3933 

3934def write_pack_index( 

3935 f: IO[bytes], 

3936 entries: Iterable[tuple[bytes, int, int | None]], 

3937 pack_checksum: bytes, 

3938 progress: Callable[..., None] | None = None, 

3939 version: int | None = None, 

3940) -> bytes: 

3941 """Write a pack index file. 

3942 

3943 Args: 

3944 f: File-like object to write to. 

3945 entries: List of (checksum, offset, crc32) tuples 

3946 pack_checksum: Checksum of the pack file. 

3947 progress: Progress function (not currently used) 

3948 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3949 

3950 Returns: 

3951 SHA of the written index file 

3952 

3953 Raises: 

3954 ValueError: If an unsupported version is specified 

3955 """ 

3956 if version is None: 

3957 version = DEFAULT_PACK_INDEX_VERSION 

3958 

3959 if version == 1: 

3960 return write_pack_index_v1(f, entries, pack_checksum) 

3961 elif version == 2: 

3962 return write_pack_index_v2(f, entries, pack_checksum) 

3963 elif version == 3: 

3964 return write_pack_index_v3(f, entries, pack_checksum) 

3965 else: 

3966 raise ValueError(f"Unsupported pack index version: {version}") 

3967 

3968 

3969class Pack: 

3970 """A Git pack object.""" 

3971 

3972 _data_load: Callable[[], PackData] | None 

3973 _idx_load: Callable[[], PackIndex] | None 

3974 

3975 _data: PackData | None 

3976 _idx: PackIndex | None 

3977 _bitmap: "PackBitmap | None" 

3978 

3979 def __init__( 

3980 self, 

3981 basename: str, 

3982 *, 

3983 object_format: ObjectFormat, 

3984 resolve_ext_ref: ResolveExtRefFn | None = None, 

3985 delta_window_size: int | None = None, 

3986 window_memory: int | None = None, 

3987 delta_cache_size: int | None = None, 

3988 depth: int | None = None, 

3989 threads: int | None = None, 

3990 big_file_threshold: int | None = None, 

3991 delta_base_cache_limit: int | None = None, 

3992 ) -> None: 

3993 """Initialize a Pack object. 

3994 

3995 Args: 

3996 basename: Base path for pack files (without .pack/.idx extension) 

3997 object_format: Hash algorithm used by the repository 

3998 resolve_ext_ref: Optional function to resolve external references 

3999 delta_window_size: Size of the delta compression window 

4000 window_memory: Memory limit for delta compression window 

4001 delta_cache_size: Size of the delta cache 

4002 depth: Maximum depth for delta chains 

4003 threads: Number of threads to use for operations 

4004 big_file_threshold: Size threshold for big file handling 

4005 delta_base_cache_limit: Maximum bytes for delta base object cache 

4006 """ 

4007 self._basename = basename 

4008 self.object_format = object_format 

4009 self._data = None 

4010 self._idx = None 

4011 self._bitmap = None 

4012 self._idx_path = self._basename + ".idx" 

4013 self._data_path = self._basename + ".pack" 

4014 self._bitmap_path = self._basename + ".bitmap" 

4015 self.delta_window_size = delta_window_size 

4016 self.window_memory = window_memory 

4017 self.delta_cache_size = delta_cache_size 

4018 self.depth = depth 

4019 self.threads = threads 

4020 self.big_file_threshold = big_file_threshold 

4021 self.delta_base_cache_limit = delta_base_cache_limit 

4022 self._idx_load = lambda: load_pack_index(self._idx_path, object_format) 

4023 self._data_load = lambda: PackData( 

4024 self._data_path, 

4025 delta_window_size=delta_window_size, 

4026 window_memory=window_memory, 

4027 delta_cache_size=delta_cache_size, 

4028 depth=depth, 

4029 threads=threads, 

4030 big_file_threshold=big_file_threshold, 

4031 delta_base_cache_limit=delta_base_cache_limit, 

4032 object_format=object_format, 

4033 ) 

4034 self.resolve_ext_ref = resolve_ext_ref 

4035 

4036 @classmethod 

4037 def from_lazy_objects( 

4038 cls, 

4039 data_fn: Callable[[], PackData], 

4040 idx_fn: Callable[[], PackIndex], 

4041 ) -> "Pack": 

4042 """Create a new pack object from callables to load pack data and index objects.""" 

4043 # Load index to get object format 

4044 idx = idx_fn() 

4045 ret = cls("", object_format=idx.object_format) 

4046 ret._data_load = data_fn 

4047 ret._idx = idx 

4048 ret._idx_load = None 

4049 return ret 

4050 

4051 @classmethod 

4052 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

4053 """Create a new pack object from pack data and index objects.""" 

4054 ret = cls("", object_format=idx.object_format) 

4055 ret._data = data 

4056 ret._data_load = None 

4057 ret._idx = idx 

4058 ret._idx_load = None 

4059 ret.check_length_and_checksum() 

4060 return ret 

4061 

4062 def name(self) -> bytes: 

4063 """The SHA over the SHAs of the objects in this pack.""" 

4064 return self.index.objects_sha1() 

4065 

4066 @property 

4067 def data(self) -> PackData: 

4068 """The pack data object being used.""" 

4069 if self._data is None: 

4070 assert self._data_load 

4071 self._data = self._data_load() 

4072 self.check_length_and_checksum() 

4073 return self._data 

4074 

4075 @property 

4076 def index(self) -> PackIndex: 

4077 """The index being used. 

4078 

4079 Note: This may be an in-memory index 

4080 """ 

4081 if self._idx is None: 

4082 assert self._idx_load 

4083 self._idx = self._idx_load() 

4084 return self._idx 

4085 

4086 @property 

4087 def bitmap(self) -> "PackBitmap | None": 

4088 """The bitmap being used, if available. 

4089 

4090 Returns: 

4091 PackBitmap instance or None if no bitmap exists 

4092 

4093 Raises: 

4094 ValueError: If bitmap file is invalid or corrupt 

4095 """ 

4096 if self._bitmap is None: 

4097 from .bitmap import read_bitmap 

4098 

4099 self._bitmap = read_bitmap(self._bitmap_path, pack_index=self.index) 

4100 return self._bitmap 

4101 

4102 def ensure_bitmap( 

4103 self, 

4104 object_store: "BaseObjectStore", 

4105 refs: dict["Ref", "ObjectID"], 

4106 commit_interval: int | None = None, 

4107 progress: Callable[[str], None] | None = None, 

4108 ) -> "PackBitmap": 

4109 """Ensure a bitmap exists for this pack, generating one if needed. 

4110 

4111 Args: 

4112 object_store: Object store to read objects from 

4113 refs: Dictionary of ref names to commit SHAs 

4114 commit_interval: Include every Nth commit in bitmap index 

4115 progress: Optional progress reporting callback 

4116 

4117 Returns: 

4118 PackBitmap instance (either existing or newly generated) 

4119 """ 

4120 from .bitmap import generate_bitmap, write_bitmap 

4121 

4122 # Check if bitmap already exists 

4123 try: 

4124 existing = self.bitmap 

4125 if existing is not None: 

4126 return existing 

4127 except FileNotFoundError: 

4128 pass # No bitmap, we'll generate one 

4129 

4130 # Generate new bitmap 

4131 if progress: 

4132 progress(f"Generating bitmap for {self.name().decode('utf-8')}...\n") 

4133 

4134 pack_bitmap = generate_bitmap( 

4135 self.index, 

4136 object_store, 

4137 refs, 

4138 self.get_stored_checksum(), 

4139 commit_interval=commit_interval, 

4140 progress=progress, 

4141 ) 

4142 

4143 # Write bitmap file 

4144 write_bitmap(self._bitmap_path, pack_bitmap) 

4145 

4146 if progress: 

4147 progress(f"Wrote {self._bitmap_path}\n") 

4148 

4149 # Update cached bitmap 

4150 self._bitmap = pack_bitmap 

4151 

4152 return pack_bitmap 

4153 

4154 @property 

4155 def mmap_size(self) -> int: 

4156 """Return the total mmapped memory usage of this pack. 

4157 

4158 This includes the pack data file and index file sizes, 

4159 but only for components that have been loaded (and thus mmapped). 

4160 """ 

4161 total = 0 

4162 if self._data is not None: 

4163 total += self._data._get_size() 

4164 if self._idx is not None and isinstance(self._idx, FilePackIndex): 

4165 total += self._idx._size 

4166 return total 

4167 

4168 def close(self) -> None: 

4169 """Close the pack file and index.""" 

4170 if self._data is not None: 

4171 self._data.close() 

4172 self._data = None 

4173 if self._idx is not None: 

4174 self._idx.close() 

4175 self._idx = None 

4176 

4177 def __del__(self) -> None: 

4178 """Ensure pack file is closed when Pack is garbage collected.""" 

4179 if self._data is not None or self._idx is not None: 

4180 import warnings 

4181 

4182 warnings.warn( 

4183 f"unclosed Pack {self!r}", ResourceWarning, stacklevel=2, source=self 

4184 ) 

4185 try: 

4186 self.close() 

4187 except Exception: 

4188 # Ignore errors during cleanup 

4189 pass 

4190 

4191 def __enter__(self) -> "Pack": 

4192 """Enter context manager.""" 

4193 return self 

4194 

4195 def __exit__( 

4196 self, 

4197 exc_type: type | None, 

4198 exc_val: BaseException | None, 

4199 exc_tb: TracebackType | None, 

4200 ) -> None: 

4201 """Exit context manager.""" 

4202 self.close() 

4203 

4204 def __eq__(self, other: object) -> bool: 

4205 """Check equality with another pack.""" 

4206 if not isinstance(other, Pack): 

4207 return False 

4208 return self.index == other.index 

4209 

4210 def __len__(self) -> int: 

4211 """Number of entries in this pack.""" 

4212 return len(self.index) 

4213 

4214 def __repr__(self) -> str: 

4215 """Return string representation of this pack.""" 

4216 return f"{self.__class__.__name__}({self._basename!r})" 

4217 

4218 def __iter__(self) -> Iterator[ObjectID]: 

4219 """Iterate over all the sha1s of the objects in this pack.""" 

4220 return iter(self.index) 

4221 

4222 def check_length_and_checksum(self) -> None: 

4223 """Sanity check the length and checksum of the pack index and data.""" 

4224 assert len(self.index) == len(self.data), ( 

4225 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

4226 ) 

4227 idx_stored_checksum = self.index.get_pack_checksum() 

4228 data_stored_checksum = self.data.get_stored_checksum() 

4229 if ( 

4230 idx_stored_checksum is not None 

4231 and idx_stored_checksum != data_stored_checksum 

4232 ): 

4233 raise ChecksumMismatch( 

4234 sha_to_hex(RawObjectID(idx_stored_checksum)), 

4235 sha_to_hex(RawObjectID(data_stored_checksum)), 

4236 ) 

4237 

4238 def check(self) -> None: 

4239 """Check the integrity of this pack. 

4240 

4241 Raises: 

4242 ChecksumMismatch: if a checksum for the index or data is wrong 

4243 """ 

4244 self.index.check() 

4245 self.data.check() 

4246 for obj in self.iterobjects(): 

4247 obj.check() 

4248 # TODO: object connectivity checks 

4249 

4250 def get_stored_checksum(self) -> bytes: 

4251 """Return the stored checksum of the pack data.""" 

4252 return self.data.get_stored_checksum() 

4253 

4254 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

4255 """Return pack tuples for all objects in pack.""" 

4256 return [(o, None) for o in self.iterobjects()] 

4257 

4258 def __contains__(self, sha1: ObjectID | RawObjectID) -> bool: 

4259 """Check whether this pack contains a particular SHA1.""" 

4260 try: 

4261 self.index.object_offset(sha1) 

4262 return True 

4263 except KeyError: 

4264 return False 

4265 

4266 def get_raw(self, sha1: RawObjectID | ObjectID) -> tuple[int, bytes]: 

4267 """Get raw object data by SHA1.""" 

4268 offset = self.index.object_offset(sha1) 

4269 obj_type, obj = self.data.get_object_at(offset) 

4270 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

4271 return type_num, b"".join(chunks) # type: ignore[arg-type] 

4272 

4273 def __getitem__(self, sha1: "ObjectID | RawObjectID") -> ShaFile: 

4274 """Retrieve the specified SHA1.""" 

4275 type, uncomp = self.get_raw(sha1) 

4276 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

4277 

4278 def iterobjects(self) -> Iterator[ShaFile]: 

4279 """Iterate over the objects in this pack.""" 

4280 return iter( 

4281 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

4282 ) 

4283 

4284 def iterobjects_subset( 

4285 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

4286 ) -> Iterator[ShaFile]: 

4287 """Iterate over a subset of objects in this pack.""" 

4288 return ( 

4289 uo 

4290 for uo in PackInflater.for_pack_subset( 

4291 self, 

4292 shas, 

4293 allow_missing=allow_missing, 

4294 resolve_ext_ref=self.resolve_ext_ref, 

4295 ) 

4296 if uo.id in shas 

4297 ) 

4298 

4299 def iter_unpacked_subset( 

4300 self, 

4301 shas: Iterable[ObjectID | RawObjectID], 

4302 *, 

4303 include_comp: bool = False, 

4304 allow_missing: bool = False, 

4305 convert_ofs_delta: bool = False, 

4306 ) -> Iterator[UnpackedObject]: 

4307 """Iterate over unpacked objects in subset.""" 

4308 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

4309 ofs: dict[int, bytes] = {} 

4310 todo: set[ObjectID | RawObjectID] = set(shas) 

4311 for unpacked in self.iter_unpacked(include_comp=include_comp): 

4312 sha = unpacked.sha() 

4313 if unpacked.offset is not None: 

4314 ofs[unpacked.offset] = sha 

4315 hexsha = sha_to_hex(RawObjectID(sha)) 

4316 if hexsha in todo: 

4317 if unpacked.pack_type_num == OFS_DELTA: 

4318 assert isinstance(unpacked.delta_base, int) 

4319 assert unpacked.offset is not None 

4320 base_offset = unpacked.offset - unpacked.delta_base 

4321 try: 

4322 unpacked.delta_base = ofs[base_offset] 

4323 except KeyError: 

4324 ofs_pending[base_offset].append(unpacked) 

4325 continue 

4326 else: 

4327 unpacked.pack_type_num = REF_DELTA 

4328 yield unpacked 

4329 todo.remove(hexsha) 

4330 if unpacked.offset is not None: 

4331 for child in ofs_pending.pop(unpacked.offset, []): 

4332 child.pack_type_num = REF_DELTA 

4333 child.delta_base = sha 

4334 yield child 

4335 assert not ofs_pending 

4336 if not allow_missing and todo: 

4337 raise UnresolvedDeltas(list(todo)) 

4338 

4339 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

4340 """Iterate over all unpacked objects in this pack.""" 

4341 ofs_to_entries = { 

4342 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

4343 } 

4344 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

4345 assert unpacked.offset is not None 

4346 (sha, crc32) = ofs_to_entries[unpacked.offset] 

4347 unpacked._sha = sha 

4348 unpacked.crc32 = crc32 

4349 yield unpacked 

4350 

4351 def keep(self, msg: bytes | None = None) -> str: 

4352 """Add a .keep file for the pack, preventing git from garbage collecting it. 

4353 

4354 Args: 

4355 msg: A message written inside the .keep file; can be used later 

4356 to determine whether or not a .keep file is obsolete. 

4357 Returns: The path of the .keep file, as a string. 

4358 """ 

4359 keepfile_name = f"{self._basename}.keep" 

4360 with GitFile(keepfile_name, "wb") as keepfile: 

4361 if msg: 

4362 keepfile.write(msg) 

4363 keepfile.write(b"\n") 

4364 return keepfile_name 

4365 

4366 def get_ref( 

4367 self, sha: RawObjectID | ObjectID 

4368 ) -> tuple[int | None, int, OldUnpackedObject]: 

4369 """Get the object for a ref SHA, only looking in this pack.""" 

4370 # TODO: cache these results 

4371 try: 

4372 offset = self.index.object_offset(sha) 

4373 except KeyError: 

4374 offset = None 

4375 if offset: 

4376 type, obj = self.data.get_object_at(offset) 

4377 elif self.resolve_ext_ref: 

4378 type, obj = self.resolve_ext_ref(sha) 

4379 else: 

4380 raise KeyError(sha) 

4381 return offset, type, obj 

4382 

4383 def resolve_object( 

4384 self, 

4385 offset: int, 

4386 type: int, 

4387 obj: OldUnpackedObject, 

4388 get_ref: Callable[ 

4389 [RawObjectID | ObjectID], tuple[int | None, int, OldUnpackedObject] 

4390 ] 

4391 | None = None, 

4392 ) -> tuple[int, OldUnpackedObject]: 

4393 """Resolve an object, possibly resolving deltas when necessary. 

4394 

4395 Returns: Tuple with object type and contents. 

4396 """ 

4397 # Walk down the delta chain, building a stack of deltas to reach 

4398 # the requested object. 

4399 base_offset: int | None = offset 

4400 base_type = type 

4401 base_obj = obj 

4402 delta_stack = [] 

4403 while base_type in DELTA_TYPES: 

4404 prev_offset = base_offset 

4405 if get_ref is None: 

4406 get_ref = self.get_ref 

4407 if base_type == OFS_DELTA: 

4408 (delta_offset, delta) = base_obj 

4409 # TODO: clean up asserts and replace with nicer error messages 

4410 assert isinstance(delta_offset, int), ( 

4411 f"Expected int, got {delta_offset.__class__}" 

4412 ) 

4413 assert base_offset is not None 

4414 base_offset = base_offset - delta_offset 

4415 base_type, base_obj = self.data.get_object_at(base_offset) 

4416 assert isinstance(base_type, int) 

4417 elif base_type == REF_DELTA: 

4418 (basename, delta) = base_obj 

4419 assert ( 

4420 isinstance(basename, bytes) 

4421 and len(basename) == self.object_format.oid_length 

4422 ) 

4423 base_offset_temp, base_type, base_obj = get_ref(RawObjectID(basename)) 

4424 assert isinstance(base_type, int) 

4425 # base_offset_temp can be None for thin packs (external references) 

4426 base_offset = base_offset_temp 

4427 if base_offset == prev_offset: # object is based on itself 

4428 raise UnresolvedDeltas([basename]) 

4429 delta_stack.append((prev_offset, base_type, delta)) 

4430 

4431 # Now grab the base object (mustn't be a delta) and apply the 

4432 # deltas all the way up the stack. 

4433 chunks = base_obj 

4434 for prev_offset, _delta_type, delta in reversed(delta_stack): 

4435 # Convert chunks to bytes for apply_delta if needed 

4436 if isinstance(chunks, list): 

4437 chunks_bytes = b"".join(chunks) 

4438 elif isinstance(chunks, tuple): 

4439 # For tuple type, second element is the actual data 

4440 _, chunk_data = chunks 

4441 if isinstance(chunk_data, list): 

4442 chunks_bytes = b"".join(chunk_data) 

4443 else: 

4444 chunks_bytes = chunk_data 

4445 else: 

4446 chunks_bytes = chunks 

4447 

4448 # Apply delta and get result as list 

4449 chunks = apply_delta(chunks_bytes, delta) 

4450 

4451 if prev_offset is not None: 

4452 self.data._offset_cache[prev_offset] = base_type, chunks 

4453 return base_type, chunks 

4454 

4455 def entries( 

4456 self, progress: Callable[[int, int], None] | None = None 

4457 ) -> Iterator[PackIndexEntry]: 

4458 """Yield entries summarizing the contents of this pack. 

4459 

4460 Args: 

4461 progress: Progress function, called with current and total 

4462 object count. 

4463 Returns: iterator of tuples with (sha, offset, crc32) 

4464 """ 

4465 return self.data.iterentries( 

4466 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

4467 ) 

4468 

4469 def sorted_entries( 

4470 self, progress: Callable[[int, int], None] | None = None 

4471 ) -> Iterator[PackIndexEntry]: 

4472 """Return entries in this pack, sorted by SHA. 

4473 

4474 Args: 

4475 progress: Progress function, called with current and total 

4476 object count 

4477 Returns: Iterator of tuples with (sha, offset, crc32) 

4478 """ 

4479 return iter( 

4480 self.data.sorted_entries( 

4481 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

4482 ) 

4483 ) 

4484 

4485 def get_unpacked_object( 

4486 self, 

4487 sha: ObjectID | RawObjectID, 

4488 *, 

4489 include_comp: bool = False, 

4490 convert_ofs_delta: bool = True, 

4491 ) -> UnpackedObject: 

4492 """Get the unpacked object for a sha. 

4493 

4494 Args: 

4495 sha: SHA of object to fetch 

4496 include_comp: Whether to include compression data in UnpackedObject 

4497 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

4498 """ 

4499 offset = self.index.object_offset(sha) 

4500 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

4501 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

4502 assert isinstance(unpacked.delta_base, int) 

4503 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

4504 unpacked.pack_type_num = REF_DELTA 

4505 return unpacked 

4506 

4507 

4508def extend_pack( 

4509 f: BinaryIO, 

4510 object_ids: Set["RawObjectID"], 

4511 get_raw: Callable[["RawObjectID | ObjectID"], tuple[int, bytes]], 

4512 object_format: "ObjectFormat", 

4513 *, 

4514 compression_level: int = -1, 

4515 progress: Callable[[bytes], None] | None = None, 

4516) -> tuple[bytes, list[tuple[RawObjectID, int, int]]]: 

4517 """Extend a pack file with more objects. 

4518 

4519 The caller should make sure that object_ids does not contain any objects 

4520 that are already in the pack 

4521 """ 

4522 # Update the header with the new number of objects. 

4523 f.seek(0) 

4524 _version, num_objects = read_pack_header(f.read) 

4525 

4526 if object_ids: 

4527 f.seek(0) 

4528 write_pack_header(f.write, num_objects + len(object_ids)) 

4529 

4530 # Must flush before reading (http://bugs.python.org/issue3207) 

4531 f.flush() 

4532 

4533 # Rescan the rest of the pack, computing the SHA with the new header. 

4534 new_sha = compute_file_sha( 

4535 f, hash_func=object_format.hash_func, end_ofs=-object_format.oid_length 

4536 ) 

4537 

4538 # Must reposition before writing (http://bugs.python.org/issue3207) 

4539 f.seek(0, os.SEEK_CUR) 

4540 

4541 extra_entries = [] 

4542 

4543 # Complete the pack. 

4544 for i, object_id in enumerate(object_ids): 

4545 if progress is not None: 

4546 progress( 

4547 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

4548 ) 

4549 assert len(object_id) == object_format.oid_length 

4550 type_num, data = get_raw(object_id) 

4551 offset = f.tell() 

4552 crc32 = write_pack_object( 

4553 f.write, 

4554 type_num, 

4555 [data], # Convert bytes to list[bytes] 

4556 sha=new_sha, 

4557 compression_level=compression_level, 

4558 object_format=object_format, 

4559 ) 

4560 extra_entries.append((object_id, offset, crc32)) 

4561 pack_sha = new_sha.digest() 

4562 f.write(pack_sha) 

4563 return pack_sha, extra_entries 

4564 

4565 

4566try: 

4567 from dulwich._pack import ( # type: ignore 

4568 apply_delta, 

4569 bisect_find_sha, 

4570 ) 

4571except ImportError: 

4572 pass 

4573 

4574# Try to import the Rust version of create_delta 

4575try: 

4576 from dulwich._pack import create_delta as _create_delta_rs 

4577except ImportError: 

4578 pass 

4579else: 

4580 # Wrap the Rust version to match the Python API (returns bytes instead of Iterator) 

4581 def _create_delta_rs_wrapper(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

4582 """Wrapper for Rust create_delta to match Python API.""" 

4583 yield _create_delta_rs(base_buf, target_buf) 

4584 

4585 create_delta = _create_delta_rs_wrapper