Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1651 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36import binascii 

37from collections import defaultdict, deque 

38from contextlib import suppress 

39from io import BytesIO, UnsupportedOperation 

40 

41try: 

42 from cdifflib import CSequenceMatcher as SequenceMatcher 

43except ModuleNotFoundError: 

44 from difflib import SequenceMatcher 

45 

46import os 

47import struct 

48import sys 

49import warnings 

50import zlib 

51from collections.abc import Iterable, Iterator, Sequence, Set 

52from hashlib import sha1 

53from itertools import chain 

54from os import SEEK_CUR, SEEK_END 

55from struct import unpack_from 

56from types import TracebackType 

57from typing import ( 

58 IO, 

59 TYPE_CHECKING, 

60 Any, 

61 BinaryIO, 

62 Callable, 

63 Generic, 

64 Optional, 

65 Protocol, 

66 TypeVar, 

67 Union, 

68) 

69 

70try: 

71 import mmap 

72except ImportError: 

73 has_mmap = False 

74else: 

75 has_mmap = True 

76 

77if sys.version_info >= (3, 12): 

78 from collections.abc import Buffer 

79else: 

80 Buffer = Union[bytes, bytearray, memoryview] 

81 

82if TYPE_CHECKING: 

83 from _hashlib import HASH as HashObject 

84 

85 from .bitmap import PackBitmap 

86 from .commit_graph import CommitGraph 

87 

88# For some reason the above try, except fails to set has_mmap = False for plan9 

89if sys.platform == "Plan9": 

90 has_mmap = False 

91 

92from . import replace_me 

93from .errors import ApplyDeltaError, ChecksumMismatch 

94from .file import GitFile, _GitFile 

95from .lru_cache import LRUSizeCache 

96from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex 

97 

98OFS_DELTA = 6 

99REF_DELTA = 7 

100 

101DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

102 

103 

104DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

105 

106# Keep pack files under 16Mb in memory, otherwise write them out to disk 

107PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

108 

109# Default pack index version to use when none is specified 

110DEFAULT_PACK_INDEX_VERSION = 2 

111 

112 

113OldUnpackedObject = Union[tuple[Union[bytes, int], list[bytes]], list[bytes]] 

114ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

115ProgressFn = Callable[[int, str], None] 

116PackHint = tuple[int, Optional[bytes]] 

117 

118 

119class UnresolvedDeltas(Exception): 

120 """Delta objects could not be resolved.""" 

121 

122 def __init__(self, shas: list[bytes]) -> None: 

123 """Initialize UnresolvedDeltas exception. 

124 

125 Args: 

126 shas: List of SHA hashes for unresolved delta objects 

127 """ 

128 self.shas = shas 

129 

130 

131class ObjectContainer(Protocol): 

132 """Protocol for objects that can contain git objects.""" 

133 

134 def add_object(self, obj: ShaFile) -> None: 

135 """Add a single object to this object store.""" 

136 

137 def add_objects( 

138 self, 

139 objects: Sequence[tuple[ShaFile, Optional[str]]], 

140 progress: Optional[Callable[..., None]] = None, 

141 ) -> Optional["Pack"]: 

142 """Add a set of objects to this object store. 

143 

144 Args: 

145 objects: Iterable over a list of (object, path) tuples 

146 progress: Progress callback for object insertion 

147 Returns: Optional Pack object of the objects written. 

148 """ 

149 

150 def __contains__(self, sha1: bytes) -> bool: 

151 """Check if a hex sha is present.""" 

152 

153 def __getitem__(self, sha1: bytes) -> ShaFile: 

154 """Retrieve an object.""" 

155 

156 def get_commit_graph(self) -> Optional["CommitGraph"]: 

157 """Get the commit graph for this object store. 

158 

159 Returns: 

160 CommitGraph object if available, None otherwise 

161 """ 

162 return None 

163 

164 

165class PackedObjectContainer(ObjectContainer): 

166 """Container for objects packed in a pack file.""" 

167 

168 def get_unpacked_object( 

169 self, sha1: bytes, *, include_comp: bool = False 

170 ) -> "UnpackedObject": 

171 """Get a raw unresolved object. 

172 

173 Args: 

174 sha1: SHA-1 hash of the object 

175 include_comp: Whether to include compressed data 

176 

177 Returns: 

178 UnpackedObject instance 

179 """ 

180 raise NotImplementedError(self.get_unpacked_object) 

181 

182 def iterobjects_subset( 

183 self, shas: Iterable[bytes], *, allow_missing: bool = False 

184 ) -> Iterator[ShaFile]: 

185 """Iterate over a subset of objects. 

186 

187 Args: 

188 shas: Iterable of object SHAs to retrieve 

189 allow_missing: If True, skip missing objects 

190 

191 Returns: 

192 Iterator of ShaFile objects 

193 """ 

194 raise NotImplementedError(self.iterobjects_subset) 

195 

196 def iter_unpacked_subset( 

197 self, 

198 shas: Iterable[bytes], 

199 *, 

200 include_comp: bool = False, 

201 allow_missing: bool = False, 

202 convert_ofs_delta: bool = True, 

203 ) -> Iterator["UnpackedObject"]: 

204 """Iterate over unpacked objects from a subset of SHAs. 

205 

206 Args: 

207 shas: Set of object SHAs to retrieve 

208 include_comp: Include compressed data if True 

209 allow_missing: If True, skip missing objects 

210 convert_ofs_delta: If True, convert offset deltas to ref deltas 

211 

212 Returns: 

213 Iterator of UnpackedObject instances 

214 """ 

215 raise NotImplementedError(self.iter_unpacked_subset) 

216 

217 

218class UnpackedObjectStream: 

219 """Abstract base class for a stream of unpacked objects.""" 

220 

221 def __iter__(self) -> Iterator["UnpackedObject"]: 

222 """Iterate over unpacked objects.""" 

223 raise NotImplementedError(self.__iter__) 

224 

225 def __len__(self) -> int: 

226 """Return the number of objects in the stream.""" 

227 raise NotImplementedError(self.__len__) 

228 

229 

230def take_msb_bytes( 

231 read: Callable[[int], bytes], crc32: Optional[int] = None 

232) -> tuple[list[int], Optional[int]]: 

233 """Read bytes marked with most significant bit. 

234 

235 Args: 

236 read: Read function 

237 crc32: Optional CRC32 checksum to update 

238 

239 Returns: 

240 Tuple of (list of bytes read, updated CRC32 or None) 

241 """ 

242 ret: list[int] = [] 

243 while len(ret) == 0 or ret[-1] & 0x80: 

244 b = read(1) 

245 if crc32 is not None: 

246 crc32 = binascii.crc32(b, crc32) 

247 ret.append(ord(b[:1])) 

248 return ret, crc32 

249 

250 

251class PackFileDisappeared(Exception): 

252 """Raised when a pack file unexpectedly disappears.""" 

253 

254 def __init__(self, obj: object) -> None: 

255 """Initialize PackFileDisappeared exception. 

256 

257 Args: 

258 obj: The object that triggered the exception 

259 """ 

260 self.obj = obj 

261 

262 

263class UnpackedObject: 

264 """Class encapsulating an object unpacked from a pack file. 

265 

266 These objects should only be created from within unpack_object. Most 

267 members start out as empty and are filled in at various points by 

268 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

269 

270 End users of this object should take care that the function they're getting 

271 this object from is guaranteed to set the members they need. 

272 """ 

273 

274 __slots__ = [ 

275 "_sha", # Cached binary SHA. 

276 "comp_chunks", # Compressed object chunks. 

277 "crc32", # CRC32. 

278 "decomp_chunks", # Decompressed object chunks. 

279 "decomp_len", # Decompressed length of this object. 

280 "delta_base", # Delta base offset or SHA. 

281 "obj_chunks", # Decompressed and delta-resolved chunks. 

282 "obj_type_num", # Type of this object. 

283 "offset", # Offset in its pack. 

284 "pack_type_num", # Type of this object in the pack (may be a delta). 

285 ] 

286 

287 obj_type_num: Optional[int] 

288 obj_chunks: Optional[list[bytes]] 

289 delta_base: Union[None, bytes, int] 

290 decomp_chunks: list[bytes] 

291 comp_chunks: Optional[list[bytes]] 

292 decomp_len: Optional[int] 

293 crc32: Optional[int] 

294 offset: Optional[int] 

295 pack_type_num: int 

296 _sha: Optional[bytes] 

297 

298 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

299 # methods of this object. 

300 def __init__( 

301 self, 

302 pack_type_num: int, 

303 *, 

304 delta_base: Union[None, bytes, int] = None, 

305 decomp_len: Optional[int] = None, 

306 crc32: Optional[int] = None, 

307 sha: Optional[bytes] = None, 

308 decomp_chunks: Optional[list[bytes]] = None, 

309 offset: Optional[int] = None, 

310 ) -> None: 

311 """Initialize an UnpackedObject. 

312 

313 Args: 

314 pack_type_num: Type number of this object in the pack 

315 delta_base: Delta base (offset or SHA) if this is a delta object 

316 decomp_len: Decompressed length of this object 

317 crc32: CRC32 checksum 

318 sha: SHA-1 hash of the object 

319 decomp_chunks: Decompressed chunks 

320 offset: Offset in the pack file 

321 """ 

322 self.offset = offset 

323 self._sha = sha 

324 self.pack_type_num = pack_type_num 

325 self.delta_base = delta_base 

326 self.comp_chunks = None 

327 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

328 if decomp_chunks is not None and decomp_len is None: 

329 self.decomp_len = sum(map(len, decomp_chunks)) 

330 else: 

331 self.decomp_len = decomp_len 

332 self.crc32 = crc32 

333 

334 if pack_type_num in DELTA_TYPES: 

335 self.obj_type_num = None 

336 self.obj_chunks = None 

337 else: 

338 self.obj_type_num = pack_type_num 

339 self.obj_chunks = self.decomp_chunks 

340 self.delta_base = delta_base 

341 

342 def sha(self) -> bytes: 

343 """Return the binary SHA of this object.""" 

344 if self._sha is None: 

345 assert self.obj_type_num is not None and self.obj_chunks is not None 

346 self._sha = obj_sha(self.obj_type_num, self.obj_chunks) 

347 return self._sha 

348 

349 def sha_file(self) -> ShaFile: 

350 """Return a ShaFile from this object.""" 

351 assert self.obj_type_num is not None and self.obj_chunks is not None 

352 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

353 

354 # Only provided for backwards compatibility with code that expects either 

355 # chunks or a delta tuple. 

356 def _obj(self) -> OldUnpackedObject: 

357 """Return the decompressed chunks, or (delta base, delta chunks).""" 

358 if self.pack_type_num in DELTA_TYPES: 

359 assert isinstance(self.delta_base, (bytes, int)) 

360 return (self.delta_base, self.decomp_chunks) 

361 else: 

362 return self.decomp_chunks 

363 

364 def __eq__(self, other: object) -> bool: 

365 """Check equality with another UnpackedObject.""" 

366 if not isinstance(other, UnpackedObject): 

367 return False 

368 for slot in self.__slots__: 

369 if getattr(self, slot) != getattr(other, slot): 

370 return False 

371 return True 

372 

373 def __ne__(self, other: object) -> bool: 

374 """Check inequality with another UnpackedObject.""" 

375 return not (self == other) 

376 

377 def __repr__(self) -> str: 

378 """Return string representation of this UnpackedObject.""" 

379 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

380 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

381 

382 

383_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

384 

385 

386def read_zlib_chunks( 

387 read_some: Callable[[int], bytes], 

388 unpacked: UnpackedObject, 

389 include_comp: bool = False, 

390 buffer_size: int = _ZLIB_BUFSIZE, 

391) -> bytes: 

392 """Read zlib data from a buffer. 

393 

394 This function requires that the buffer have additional data following the 

395 compressed data, which is guaranteed to be the case for git pack files. 

396 

397 Args: 

398 read_some: Read function that returns at least one byte, but may 

399 return less than the requested size. 

400 unpacked: An UnpackedObject to write result data to. If its crc32 

401 attr is not None, the CRC32 of the compressed bytes will be computed 

402 using this starting CRC32. 

403 After this function, will have the following attrs set: 

404 * comp_chunks (if include_comp is True) 

405 * decomp_chunks 

406 * decomp_len 

407 * crc32 

408 include_comp: If True, include compressed data in the result. 

409 buffer_size: Size of the read buffer. 

410 Returns: Leftover unused data from the decompression. 

411 

412 Raises: 

413 zlib.error: if a decompression error occurred. 

414 """ 

415 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

416 raise ValueError("non-negative zlib data stream size expected") 

417 decomp_obj = zlib.decompressobj() 

418 

419 comp_chunks = [] 

420 decomp_chunks = unpacked.decomp_chunks 

421 decomp_len = 0 

422 crc32 = unpacked.crc32 

423 

424 while True: 

425 add = read_some(buffer_size) 

426 if not add: 

427 raise zlib.error("EOF before end of zlib stream") 

428 comp_chunks.append(add) 

429 decomp = decomp_obj.decompress(add) 

430 decomp_len += len(decomp) 

431 decomp_chunks.append(decomp) 

432 unused = decomp_obj.unused_data 

433 if unused: 

434 left = len(unused) 

435 if crc32 is not None: 

436 crc32 = binascii.crc32(add[:-left], crc32) 

437 if include_comp: 

438 comp_chunks[-1] = add[:-left] 

439 break 

440 elif crc32 is not None: 

441 crc32 = binascii.crc32(add, crc32) 

442 if crc32 is not None: 

443 crc32 &= 0xFFFFFFFF 

444 

445 if decomp_len != unpacked.decomp_len: 

446 raise zlib.error("decompressed data does not match expected size") 

447 

448 unpacked.crc32 = crc32 

449 if include_comp: 

450 unpacked.comp_chunks = comp_chunks 

451 return unused 

452 

453 

454def iter_sha1(iter: Iterable[bytes]) -> bytes: 

455 """Return the hexdigest of the SHA1 over a set of names. 

456 

457 Args: 

458 iter: Iterator over string objects 

459 Returns: 40-byte hex sha1 digest 

460 """ 

461 sha = sha1() 

462 for name in iter: 

463 sha.update(name) 

464 return sha.hexdigest().encode("ascii") 

465 

466 

467def load_pack_index(path: Union[str, os.PathLike[str]]) -> "PackIndex": 

468 """Load an index file by path. 

469 

470 Args: 

471 path: Path to the index file 

472 Returns: A PackIndex loaded from the given path 

473 """ 

474 with GitFile(path, "rb") as f: 

475 return load_pack_index_file(path, f) 

476 

477 

478def _load_file_contents( 

479 f: Union[IO[bytes], _GitFile], size: Optional[int] = None 

480) -> tuple[Union[bytes, Any], int]: 

481 """Load contents from a file, preferring mmap when possible. 

482 

483 Args: 

484 f: File-like object to load 

485 size: Expected size, or None to determine from file 

486 Returns: Tuple of (contents, size) 

487 """ 

488 try: 

489 fd = f.fileno() 

490 except (UnsupportedOperation, AttributeError): 

491 fd = None 

492 # Attempt to use mmap if possible 

493 if fd is not None: 

494 if size is None: 

495 size = os.fstat(fd).st_size 

496 if has_mmap: 

497 try: 

498 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

499 except (OSError, ValueError): 

500 # Can't mmap - perhaps a socket or invalid file descriptor 

501 pass 

502 else: 

503 return contents, size 

504 contents_bytes = f.read() 

505 size = len(contents_bytes) 

506 return contents_bytes, size 

507 

508 

509def load_pack_index_file( 

510 path: Union[str, os.PathLike[str]], f: Union[IO[bytes], _GitFile] 

511) -> "PackIndex": 

512 """Load an index file from a file-like object. 

513 

514 Args: 

515 path: Path for the index file 

516 f: File-like object 

517 Returns: A PackIndex loaded from the given file 

518 """ 

519 contents, size = _load_file_contents(f) 

520 if contents[:4] == b"\377tOc": 

521 version = struct.unpack(b">L", contents[4:8])[0] 

522 if version == 2: 

523 return PackIndex2(path, file=f, contents=contents, size=size) 

524 elif version == 3: 

525 return PackIndex3(path, file=f, contents=contents, size=size) 

526 else: 

527 raise KeyError(f"Unknown pack index format {version}") 

528 else: 

529 return PackIndex1(path, file=f, contents=contents, size=size) 

530 

531 

532def bisect_find_sha( 

533 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

534) -> Optional[int]: 

535 """Find a SHA in a data blob with sorted SHAs. 

536 

537 Args: 

538 start: Start index of range to search 

539 end: End index of range to search 

540 sha: Sha to find 

541 unpack_name: Callback to retrieve SHA by index 

542 Returns: Index of the SHA, or None if it wasn't found 

543 """ 

544 assert start <= end 

545 while start <= end: 

546 i = (start + end) // 2 

547 file_sha = unpack_name(i) 

548 if file_sha < sha: 

549 start = i + 1 

550 elif file_sha > sha: 

551 end = i - 1 

552 else: 

553 return i 

554 return None 

555 

556 

557PackIndexEntry = tuple[bytes, int, Optional[int]] 

558 

559 

560class PackIndex: 

561 """An index in to a packfile. 

562 

563 Given a sha id of an object a pack index can tell you the location in the 

564 packfile of that object if it has it. 

565 """ 

566 

567 # Default to SHA-1 for backward compatibility 

568 hash_algorithm = 1 

569 hash_size = 20 

570 

571 def __eq__(self, other: object) -> bool: 

572 """Check equality with another PackIndex.""" 

573 if not isinstance(other, PackIndex): 

574 return False 

575 

576 for (name1, _, _), (name2, _, _) in zip( 

577 self.iterentries(), other.iterentries() 

578 ): 

579 if name1 != name2: 

580 return False 

581 return True 

582 

583 def __ne__(self, other: object) -> bool: 

584 """Check if this pack index is not equal to another.""" 

585 return not self.__eq__(other) 

586 

587 def __len__(self) -> int: 

588 """Return the number of entries in this pack index.""" 

589 raise NotImplementedError(self.__len__) 

590 

591 def __iter__(self) -> Iterator[bytes]: 

592 """Iterate over the SHAs in this pack.""" 

593 return map(sha_to_hex, self._itersha()) 

594 

595 def iterentries(self) -> Iterator[PackIndexEntry]: 

596 """Iterate over the entries in this pack index. 

597 

598 Returns: iterator over tuples with object name, offset in packfile and 

599 crc32 checksum. 

600 """ 

601 raise NotImplementedError(self.iterentries) 

602 

603 def get_pack_checksum(self) -> Optional[bytes]: 

604 """Return the SHA1 checksum stored for the corresponding packfile. 

605 

606 Returns: 20-byte binary digest, or None if not available 

607 """ 

608 raise NotImplementedError(self.get_pack_checksum) 

609 

610 @replace_me(since="0.21.0", remove_in="0.23.0") 

611 def object_index(self, sha: bytes) -> int: 

612 """Return the index for the given SHA. 

613 

614 Args: 

615 sha: SHA-1 hash 

616 

617 Returns: 

618 Index position 

619 """ 

620 return self.object_offset(sha) 

621 

622 def object_offset(self, sha: bytes) -> int: 

623 """Return the offset in to the corresponding packfile for the object. 

624 

625 Given the name of an object it will return the offset that object 

626 lives at within the corresponding pack file. If the pack file doesn't 

627 have the object then None will be returned. 

628 """ 

629 raise NotImplementedError(self.object_offset) 

630 

631 def object_sha1(self, index: int) -> bytes: 

632 """Return the SHA1 corresponding to the index in the pack file.""" 

633 for name, offset, _crc32 in self.iterentries(): 

634 if offset == index: 

635 return name 

636 else: 

637 raise KeyError(index) 

638 

639 def _object_offset(self, sha: bytes) -> int: 

640 """See object_offset. 

641 

642 Args: 

643 sha: A *binary* SHA string. (20 characters long)_ 

644 """ 

645 raise NotImplementedError(self._object_offset) 

646 

647 def objects_sha1(self) -> bytes: 

648 """Return the hex SHA1 over all the shas of all objects in this pack. 

649 

650 Note: This is used for the filename of the pack. 

651 """ 

652 return iter_sha1(self._itersha()) 

653 

654 def _itersha(self) -> Iterator[bytes]: 

655 """Yield all the SHA1's of the objects in the index, sorted.""" 

656 raise NotImplementedError(self._itersha) 

657 

658 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

659 """Iterate over all SHA1s with the given prefix. 

660 

661 Args: 

662 prefix: Binary prefix to match 

663 Returns: Iterator of matching SHA1s 

664 """ 

665 # Default implementation for PackIndex classes that don't override 

666 for sha, _, _ in self.iterentries(): 

667 if sha.startswith(prefix): 

668 yield sha 

669 

670 def close(self) -> None: 

671 """Close any open files.""" 

672 

673 def check(self) -> None: 

674 """Check the consistency of this pack index.""" 

675 

676 

677class MemoryPackIndex(PackIndex): 

678 """Pack index that is stored entirely in memory.""" 

679 

680 def __init__( 

681 self, 

682 entries: list[tuple[bytes, int, Optional[int]]], 

683 pack_checksum: Optional[bytes] = None, 

684 ) -> None: 

685 """Create a new MemoryPackIndex. 

686 

687 Args: 

688 entries: Sequence of name, idx, crc32 (sorted) 

689 pack_checksum: Optional pack checksum 

690 """ 

691 self._by_sha = {} 

692 self._by_offset = {} 

693 for name, offset, _crc32 in entries: 

694 self._by_sha[name] = offset 

695 self._by_offset[offset] = name 

696 self._entries = entries 

697 self._pack_checksum = pack_checksum 

698 

699 def get_pack_checksum(self) -> Optional[bytes]: 

700 """Return the SHA checksum stored for the corresponding packfile.""" 

701 return self._pack_checksum 

702 

703 def __len__(self) -> int: 

704 """Return the number of entries in this pack index.""" 

705 return len(self._entries) 

706 

707 def object_offset(self, sha: bytes) -> int: 

708 """Return the offset for the given SHA. 

709 

710 Args: 

711 sha: SHA to look up (binary or hex) 

712 Returns: Offset in the pack file 

713 """ 

714 if len(sha) == 40: 

715 sha = hex_to_sha(sha) 

716 return self._by_sha[sha] 

717 

718 def object_sha1(self, offset: int) -> bytes: 

719 """Return the SHA1 for the object at the given offset.""" 

720 return self._by_offset[offset] 

721 

722 def _itersha(self) -> Iterator[bytes]: 

723 """Iterate over all SHA1s in the index.""" 

724 return iter(self._by_sha) 

725 

726 def iterentries(self) -> Iterator[PackIndexEntry]: 

727 """Iterate over all index entries.""" 

728 return iter(self._entries) 

729 

730 @classmethod 

731 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

732 """Create a MemoryPackIndex from a PackData object.""" 

733 return MemoryPackIndex( 

734 list(pack_data.sorted_entries()), pack_data.get_stored_checksum() 

735 ) 

736 

737 @classmethod 

738 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

739 """Create a copy of another PackIndex in memory.""" 

740 return cls(list(other_index.iterentries()), other_index.get_pack_checksum()) 

741 

742 

743class FilePackIndex(PackIndex): 

744 """Pack index that is based on a file. 

745 

746 To do the loop it opens the file, and indexes first 256 4 byte groups 

747 with the first byte of the sha id. The value in the four byte group indexed 

748 is the end of the group that shares the same starting byte. Subtract one 

749 from the starting byte and index again to find the start of the group. 

750 The values are sorted by sha id within the group, so do the math to find 

751 the start and end offset and then bisect in to find if the value is 

752 present. 

753 """ 

754 

755 _fan_out_table: list[int] 

756 _file: Union[IO[bytes], _GitFile] 

757 

758 def __init__( 

759 self, 

760 filename: Union[str, os.PathLike[str]], 

761 file: Optional[Union[IO[bytes], _GitFile]] = None, 

762 contents: Optional[Union[bytes, "mmap.mmap"]] = None, 

763 size: Optional[int] = None, 

764 ) -> None: 

765 """Create a pack index object. 

766 

767 Provide it with the name of the index file to consider, and it will map 

768 it whenever required. 

769 """ 

770 self._filename = filename 

771 # Take the size now, so it can be checked each time we map the file to 

772 # ensure that it hasn't changed. 

773 if file is None: 

774 self._file = GitFile(filename, "rb") 

775 else: 

776 self._file = file 

777 if contents is None: 

778 self._contents, self._size = _load_file_contents(self._file, size) 

779 else: 

780 self._contents = contents 

781 self._size = size if size is not None else len(contents) 

782 

783 @property 

784 def path(self) -> str: 

785 """Return the path to this index file.""" 

786 return os.fspath(self._filename) 

787 

788 def __eq__(self, other: object) -> bool: 

789 """Check equality with another FilePackIndex.""" 

790 # Quick optimization: 

791 if ( 

792 isinstance(other, FilePackIndex) 

793 and self._fan_out_table != other._fan_out_table 

794 ): 

795 return False 

796 

797 return super().__eq__(other) 

798 

799 def close(self) -> None: 

800 """Close the underlying file and any mmap.""" 

801 self._file.close() 

802 close_fn = getattr(self._contents, "close", None) 

803 if close_fn is not None: 

804 close_fn() 

805 

806 def __len__(self) -> int: 

807 """Return the number of entries in this pack index.""" 

808 return self._fan_out_table[-1] 

809 

810 def _unpack_entry(self, i: int) -> PackIndexEntry: 

811 """Unpack the i-th entry in the index file. 

812 

813 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

814 checksum (if known). 

815 """ 

816 raise NotImplementedError(self._unpack_entry) 

817 

818 def _unpack_name(self, i: int) -> bytes: 

819 """Unpack the i-th name from the index file.""" 

820 raise NotImplementedError(self._unpack_name) 

821 

822 def _unpack_offset(self, i: int) -> int: 

823 """Unpack the i-th object offset from the index file.""" 

824 raise NotImplementedError(self._unpack_offset) 

825 

826 def _unpack_crc32_checksum(self, i: int) -> Optional[int]: 

827 """Unpack the crc32 checksum for the ith object from the index file.""" 

828 raise NotImplementedError(self._unpack_crc32_checksum) 

829 

830 def _itersha(self) -> Iterator[bytes]: 

831 """Iterate over all SHA1s in the index.""" 

832 for i in range(len(self)): 

833 yield self._unpack_name(i) 

834 

835 def iterentries(self) -> Iterator[PackIndexEntry]: 

836 """Iterate over the entries in this pack index. 

837 

838 Returns: iterator over tuples with object name, offset in packfile and 

839 crc32 checksum. 

840 """ 

841 for i in range(len(self)): 

842 yield self._unpack_entry(i) 

843 

844 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

845 """Read the fan-out table from the index. 

846 

847 The fan-out table contains 256 entries mapping first byte values 

848 to the number of objects with SHA1s less than or equal to that byte. 

849 

850 Args: 

851 start_offset: Offset in the file where the fan-out table starts 

852 Returns: List of 256 integers 

853 """ 

854 ret = [] 

855 for i in range(0x100): 

856 fanout_entry = self._contents[ 

857 start_offset + i * 4 : start_offset + (i + 1) * 4 

858 ] 

859 ret.append(struct.unpack(">L", fanout_entry)[0]) 

860 return ret 

861 

862 def check(self) -> None: 

863 """Check that the stored checksum matches the actual checksum.""" 

864 actual = self.calculate_checksum() 

865 stored = self.get_stored_checksum() 

866 if actual != stored: 

867 raise ChecksumMismatch(stored, actual) 

868 

869 def calculate_checksum(self) -> bytes: 

870 """Calculate the SHA1 checksum over this pack index. 

871 

872 Returns: This is a 20-byte binary digest 

873 """ 

874 return sha1(self._contents[:-20]).digest() 

875 

876 def get_pack_checksum(self) -> bytes: 

877 """Return the SHA1 checksum stored for the corresponding packfile. 

878 

879 Returns: 20-byte binary digest 

880 """ 

881 return bytes(self._contents[-40:-20]) 

882 

883 def get_stored_checksum(self) -> bytes: 

884 """Return the SHA1 checksum stored for this index. 

885 

886 Returns: 20-byte binary digest 

887 """ 

888 return bytes(self._contents[-20:]) 

889 

890 def object_offset(self, sha: bytes) -> int: 

891 """Return the offset in to the corresponding packfile for the object. 

892 

893 Given the name of an object it will return the offset that object 

894 lives at within the corresponding pack file. If the pack file doesn't 

895 have the object then None will be returned. 

896 """ 

897 if len(sha) == 40: 

898 sha = hex_to_sha(sha) 

899 try: 

900 return self._object_offset(sha) 

901 except ValueError as exc: 

902 closed = getattr(self._contents, "closed", None) 

903 if closed in (None, True): 

904 raise PackFileDisappeared(self) from exc 

905 raise 

906 

907 def _object_offset(self, sha: bytes) -> int: 

908 """See object_offset. 

909 

910 Args: 

911 sha: A *binary* SHA string. (20 characters long)_ 

912 """ 

913 assert len(sha) == 20 

914 idx = ord(sha[:1]) 

915 if idx == 0: 

916 start = 0 

917 else: 

918 start = self._fan_out_table[idx - 1] 

919 end = self._fan_out_table[idx] 

920 i = bisect_find_sha(start, end, sha, self._unpack_name) 

921 if i is None: 

922 raise KeyError(sha) 

923 return self._unpack_offset(i) 

924 

925 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

926 """Iterate over all SHA1s with the given prefix.""" 

927 start = ord(prefix[:1]) 

928 if start == 0: 

929 start = 0 

930 else: 

931 start = self._fan_out_table[start - 1] 

932 end = ord(prefix[:1]) + 1 

933 if end == 0x100: 

934 end = len(self) 

935 else: 

936 end = self._fan_out_table[end] 

937 assert start <= end 

938 started = False 

939 for i in range(start, end): 

940 name: bytes = self._unpack_name(i) 

941 if name.startswith(prefix): 

942 yield name 

943 started = True 

944 elif started: 

945 break 

946 

947 

948class PackIndex1(FilePackIndex): 

949 """Version 1 Pack Index file.""" 

950 

951 def __init__( 

952 self, 

953 filename: Union[str, os.PathLike[str]], 

954 file: Optional[Union[IO[bytes], _GitFile]] = None, 

955 contents: Optional[bytes] = None, 

956 size: Optional[int] = None, 

957 ) -> None: 

958 """Initialize a version 1 pack index. 

959 

960 Args: 

961 filename: Path to the index file 

962 file: Optional file object 

963 contents: Optional mmap'd contents 

964 size: Optional size of the index 

965 """ 

966 super().__init__(filename, file, contents, size) 

967 self.version = 1 

968 self._fan_out_table = self._read_fan_out_table(0) 

969 

970 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]: 

971 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24)) 

972 return (name, offset, None) 

973 

974 def _unpack_name(self, i: int) -> bytes: 

975 offset = (0x100 * 4) + (i * 24) + 4 

976 return self._contents[offset : offset + 20] 

977 

978 def _unpack_offset(self, i: int) -> int: 

979 offset = (0x100 * 4) + (i * 24) 

980 result = unpack_from(">L", self._contents, offset)[0] 

981 assert isinstance(result, int) 

982 return result 

983 

984 def _unpack_crc32_checksum(self, i: int) -> None: 

985 # Not stored in v1 index files 

986 return None 

987 

988 

989class PackIndex2(FilePackIndex): 

990 """Version 2 Pack Index file.""" 

991 

992 def __init__( 

993 self, 

994 filename: Union[str, os.PathLike[str]], 

995 file: Optional[Union[IO[bytes], _GitFile]] = None, 

996 contents: Optional[bytes] = None, 

997 size: Optional[int] = None, 

998 ) -> None: 

999 """Initialize a version 2 pack index. 

1000 

1001 Args: 

1002 filename: Path to the index file 

1003 file: Optional file object 

1004 contents: Optional mmap'd contents 

1005 size: Optional size of the index 

1006 """ 

1007 super().__init__(filename, file, contents, size) 

1008 if self._contents[:4] != b"\377tOc": 

1009 raise AssertionError("Not a v2 pack index file") 

1010 (self.version,) = unpack_from(b">L", self._contents, 4) 

1011 if self.version != 2: 

1012 raise AssertionError(f"Version was {self.version}") 

1013 self._fan_out_table = self._read_fan_out_table(8) 

1014 self._name_table_offset = 8 + 0x100 * 4 

1015 self._crc32_table_offset = self._name_table_offset + 20 * len(self) 

1016 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1017 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1018 self 

1019 ) 

1020 

1021 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1022 return ( 

1023 self._unpack_name(i), 

1024 self._unpack_offset(i), 

1025 self._unpack_crc32_checksum(i), 

1026 ) 

1027 

1028 def _unpack_name(self, i: int) -> bytes: 

1029 offset = self._name_table_offset + i * 20 

1030 return self._contents[offset : offset + 20] 

1031 

1032 def _unpack_offset(self, i: int) -> int: 

1033 offset_pos = self._pack_offset_table_offset + i * 4 

1034 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1035 assert isinstance(offset, int) 

1036 if offset & (2**31): 

1037 large_offset_pos = ( 

1038 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1039 ) 

1040 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1041 assert isinstance(offset, int) 

1042 return offset 

1043 

1044 def _unpack_crc32_checksum(self, i: int) -> int: 

1045 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1046 assert isinstance(result, int) 

1047 return result 

1048 

1049 

1050class PackIndex3(FilePackIndex): 

1051 """Version 3 Pack Index file. 

1052 

1053 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1054 """ 

1055 

1056 def __init__( 

1057 self, 

1058 filename: Union[str, os.PathLike[str]], 

1059 file: Optional[Union[IO[bytes], _GitFile]] = None, 

1060 contents: Optional[bytes] = None, 

1061 size: Optional[int] = None, 

1062 ) -> None: 

1063 """Initialize a version 3 pack index. 

1064 

1065 Args: 

1066 filename: Path to the index file 

1067 file: Optional file object 

1068 contents: Optional mmap'd contents 

1069 size: Optional size of the index 

1070 """ 

1071 super().__init__(filename, file, contents, size) 

1072 if self._contents[:4] != b"\377tOc": 

1073 raise AssertionError("Not a v3 pack index file") 

1074 (self.version,) = unpack_from(b">L", self._contents, 4) 

1075 if self.version != 3: 

1076 raise AssertionError(f"Version was {self.version}") 

1077 

1078 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1079 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8) 

1080 if self.hash_algorithm == 1: 

1081 self.hash_size = 20 # SHA-1 

1082 elif self.hash_algorithm == 2: 

1083 self.hash_size = 32 # SHA-256 

1084 else: 

1085 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}") 

1086 

1087 # Read length of shortened object names 

1088 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1089 

1090 # Calculate offsets based on variable hash size 

1091 self._fan_out_table = self._read_fan_out_table( 

1092 16 

1093 ) # After header (4 + 4 + 4 + 4) 

1094 self._name_table_offset = 16 + 0x100 * 4 

1095 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1096 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1097 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1098 self 

1099 ) 

1100 

1101 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1102 return ( 

1103 self._unpack_name(i), 

1104 self._unpack_offset(i), 

1105 self._unpack_crc32_checksum(i), 

1106 ) 

1107 

1108 def _unpack_name(self, i: int) -> bytes: 

1109 offset = self._name_table_offset + i * self.hash_size 

1110 return self._contents[offset : offset + self.hash_size] 

1111 

1112 def _unpack_offset(self, i: int) -> int: 

1113 offset_pos = self._pack_offset_table_offset + i * 4 

1114 offset = unpack_from(">L", self._contents, offset_pos)[0] 

1115 assert isinstance(offset, int) 

1116 if offset & (2**31): 

1117 large_offset_pos = ( 

1118 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1119 ) 

1120 offset = unpack_from(">Q", self._contents, large_offset_pos)[0] 

1121 assert isinstance(offset, int) 

1122 return offset 

1123 

1124 def _unpack_crc32_checksum(self, i: int) -> int: 

1125 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1126 assert isinstance(result, int) 

1127 return result 

1128 

1129 

1130def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1131 """Read the header of a pack file. 

1132 

1133 Args: 

1134 read: Read function 

1135 Returns: Tuple of (pack version, number of objects). If no data is 

1136 available to read, returns (None, None). 

1137 """ 

1138 header = read(12) 

1139 if not header: 

1140 raise AssertionError("file too short to contain pack") 

1141 if header[:4] != b"PACK": 

1142 raise AssertionError(f"Invalid pack header {header!r}") 

1143 (version,) = unpack_from(b">L", header, 4) 

1144 if version not in (2, 3): 

1145 raise AssertionError(f"Version was {version}") 

1146 (num_objects,) = unpack_from(b">L", header, 8) 

1147 return (version, num_objects) 

1148 

1149 

1150def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int: 

1151 """Get the total length of a sequence of chunks. 

1152 

1153 Args: 

1154 chunks: Either a single bytes object or an iterable of bytes 

1155 Returns: Total length in bytes 

1156 """ 

1157 if isinstance(chunks, bytes): 

1158 return len(chunks) 

1159 else: 

1160 return sum(map(len, chunks)) 

1161 

1162 

1163def unpack_object( 

1164 read_all: Callable[[int], bytes], 

1165 read_some: Optional[Callable[[int], bytes]] = None, 

1166 compute_crc32: bool = False, 

1167 include_comp: bool = False, 

1168 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1169) -> tuple[UnpackedObject, bytes]: 

1170 """Unpack a Git object. 

1171 

1172 Args: 

1173 read_all: Read function that blocks until the number of requested 

1174 bytes are read. 

1175 read_some: Read function that returns at least one byte, but may not 

1176 return the number of bytes requested. 

1177 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1178 False, the returned CRC32 will be None. 

1179 include_comp: If True, include compressed data in the result. 

1180 zlib_bufsize: An optional buffer size for zlib operations. 

1181 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1182 leftover from decompression, and unpacked in an UnpackedObject with 

1183 the following attrs set: 

1184 

1185 * obj_chunks (for non-delta types) 

1186 * pack_type_num 

1187 * delta_base (for delta types) 

1188 * comp_chunks (if include_comp is True) 

1189 * decomp_chunks 

1190 * decomp_len 

1191 * crc32 (if compute_crc32 is True) 

1192 """ 

1193 if read_some is None: 

1194 read_some = read_all 

1195 if compute_crc32: 

1196 crc32 = 0 

1197 else: 

1198 crc32 = None 

1199 

1200 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1201 type_num = (raw[0] >> 4) & 0x07 

1202 size = raw[0] & 0x0F 

1203 for i, byte in enumerate(raw[1:]): 

1204 size += (byte & 0x7F) << ((i * 7) + 4) 

1205 

1206 delta_base: Union[int, bytes, None] 

1207 raw_base = len(raw) 

1208 if type_num == OFS_DELTA: 

1209 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1210 raw_base += len(raw) 

1211 if raw[-1] & 0x80: 

1212 raise AssertionError 

1213 delta_base_offset = raw[0] & 0x7F 

1214 for byte in raw[1:]: 

1215 delta_base_offset += 1 

1216 delta_base_offset <<= 7 

1217 delta_base_offset += byte & 0x7F 

1218 delta_base = delta_base_offset 

1219 elif type_num == REF_DELTA: 

1220 delta_base_obj = read_all(20) 

1221 if crc32 is not None: 

1222 crc32 = binascii.crc32(delta_base_obj, crc32) 

1223 delta_base = delta_base_obj 

1224 raw_base += 20 

1225 else: 

1226 delta_base = None 

1227 

1228 unpacked = UnpackedObject( 

1229 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32 

1230 ) 

1231 unused = read_zlib_chunks( 

1232 read_some, 

1233 unpacked, 

1234 buffer_size=zlib_bufsize, 

1235 include_comp=include_comp, 

1236 ) 

1237 return unpacked, unused 

1238 

1239 

1240def _compute_object_size(value: tuple[int, Any]) -> int: 

1241 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1242 (num, obj) = value 

1243 if num in DELTA_TYPES: 

1244 return chunks_length(obj[1]) 

1245 return chunks_length(obj) 

1246 

1247 

1248class PackStreamReader: 

1249 """Class to read a pack stream. 

1250 

1251 The pack is read from a ReceivableProtocol using read() or recv() as 

1252 appropriate. 

1253 """ 

1254 

1255 def __init__( 

1256 self, 

1257 read_all: Callable[[int], bytes], 

1258 read_some: Optional[Callable[[int], bytes]] = None, 

1259 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1260 ) -> None: 

1261 """Initialize pack stream reader. 

1262 

1263 Args: 

1264 read_all: Function to read all requested bytes 

1265 read_some: Function to read some bytes (optional) 

1266 zlib_bufsize: Buffer size for zlib decompression 

1267 """ 

1268 self.read_all = read_all 

1269 if read_some is None: 

1270 self.read_some = read_all 

1271 else: 

1272 self.read_some = read_some 

1273 self.sha = sha1() 

1274 self._offset = 0 

1275 self._rbuf = BytesIO() 

1276 # trailer is a deque to avoid memory allocation on small reads 

1277 self._trailer: deque[int] = deque() 

1278 self._zlib_bufsize = zlib_bufsize 

1279 

1280 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1281 """Read up to size bytes using the given callback. 

1282 

1283 As a side effect, update the verifier's hash (excluding the last 20 

1284 bytes read). 

1285 

1286 Args: 

1287 read: The read callback to read from. 

1288 size: The maximum number of bytes to read; the particular 

1289 behavior is callback-specific. 

1290 Returns: Bytes read 

1291 """ 

1292 data = read(size) 

1293 

1294 # maintain a trailer of the last 20 bytes we've read 

1295 n = len(data) 

1296 self._offset += n 

1297 tn = len(self._trailer) 

1298 if n >= 20: 

1299 to_pop = tn 

1300 to_add = 20 

1301 else: 

1302 to_pop = max(n + tn - 20, 0) 

1303 to_add = n 

1304 self.sha.update( 

1305 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1306 ) 

1307 self._trailer.extend(data[-to_add:]) 

1308 

1309 # hash everything but the trailer 

1310 self.sha.update(data[:-to_add]) 

1311 return data 

1312 

1313 def _buf_len(self) -> int: 

1314 buf = self._rbuf 

1315 start = buf.tell() 

1316 buf.seek(0, SEEK_END) 

1317 end = buf.tell() 

1318 buf.seek(start) 

1319 return end - start 

1320 

1321 @property 

1322 def offset(self) -> int: 

1323 """Return current offset in the stream.""" 

1324 return self._offset - self._buf_len() 

1325 

1326 def read(self, size: int) -> bytes: 

1327 """Read, blocking until size bytes are read.""" 

1328 buf_len = self._buf_len() 

1329 if buf_len >= size: 

1330 return self._rbuf.read(size) 

1331 buf_data = self._rbuf.read() 

1332 self._rbuf = BytesIO() 

1333 return buf_data + self._read(self.read_all, size - buf_len) 

1334 

1335 def recv(self, size: int) -> bytes: 

1336 """Read up to size bytes, blocking until one byte is read.""" 

1337 buf_len = self._buf_len() 

1338 if buf_len: 

1339 data = self._rbuf.read(size) 

1340 if size >= buf_len: 

1341 self._rbuf = BytesIO() 

1342 return data 

1343 return self._read(self.read_some, size) 

1344 

1345 def __len__(self) -> int: 

1346 """Return the number of objects in this pack.""" 

1347 return self._num_objects 

1348 

1349 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1350 """Read the objects in this pack file. 

1351 

1352 Args: 

1353 compute_crc32: If True, compute the CRC32 of the compressed 

1354 data. If False, the returned CRC32 will be None. 

1355 Returns: Iterator over UnpackedObjects with the following members set: 

1356 offset 

1357 obj_type_num 

1358 obj_chunks (for non-delta types) 

1359 delta_base (for delta types) 

1360 decomp_chunks 

1361 decomp_len 

1362 crc32 (if compute_crc32 is True) 

1363 

1364 Raises: 

1365 ChecksumMismatch: if the checksum of the pack contents does not 

1366 match the checksum in the pack trailer. 

1367 zlib.error: if an error occurred during zlib decompression. 

1368 IOError: if an error occurred writing to the output file. 

1369 """ 

1370 _pack_version, self._num_objects = read_pack_header(self.read) 

1371 

1372 for _ in range(self._num_objects): 

1373 offset = self.offset 

1374 unpacked, unused = unpack_object( 

1375 self.read, 

1376 read_some=self.recv, 

1377 compute_crc32=compute_crc32, 

1378 zlib_bufsize=self._zlib_bufsize, 

1379 ) 

1380 unpacked.offset = offset 

1381 

1382 # prepend any unused data to current read buffer 

1383 buf = BytesIO() 

1384 buf.write(unused) 

1385 buf.write(self._rbuf.read()) 

1386 buf.seek(0) 

1387 self._rbuf = buf 

1388 

1389 yield unpacked 

1390 

1391 if self._buf_len() < 20: 

1392 # If the read buffer is full, then the last read() got the whole 

1393 # trailer off the wire. If not, it means there is still some of the 

1394 # trailer to read. We need to read() all 20 bytes; N come from the 

1395 # read buffer and (20 - N) come from the wire. 

1396 self.read(20) 

1397 

1398 pack_sha = bytearray(self._trailer) 

1399 if pack_sha != self.sha.digest(): 

1400 raise ChecksumMismatch(sha_to_hex(bytes(pack_sha)), self.sha.hexdigest()) 

1401 

1402 

1403class PackStreamCopier(PackStreamReader): 

1404 """Class to verify a pack stream as it is being read. 

1405 

1406 The pack is read from a ReceivableProtocol using read() or recv() as 

1407 appropriate and written out to the given file-like object. 

1408 """ 

1409 

1410 def __init__( 

1411 self, 

1412 read_all: Callable[[int], bytes], 

1413 read_some: Optional[Callable[[int], bytes]], 

1414 outfile: IO[bytes], 

1415 delta_iter: Optional["DeltaChainIterator[UnpackedObject]"] = None, 

1416 ) -> None: 

1417 """Initialize the copier. 

1418 

1419 Args: 

1420 read_all: Read function that blocks until the number of 

1421 requested bytes are read. 

1422 read_some: Read function that returns at least one byte, but may 

1423 not return the number of bytes requested. 

1424 outfile: File-like object to write output through. 

1425 delta_iter: Optional DeltaChainIterator to record deltas as we 

1426 read them. 

1427 """ 

1428 super().__init__(read_all, read_some=read_some) 

1429 self.outfile = outfile 

1430 self._delta_iter = delta_iter 

1431 

1432 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1433 """Read data from the read callback and write it to the file.""" 

1434 data = super()._read(read, size) 

1435 self.outfile.write(data) 

1436 return data 

1437 

1438 def verify(self, progress: Optional[Callable[..., None]] = None) -> None: 

1439 """Verify a pack stream and write it to the output file. 

1440 

1441 See PackStreamReader.iterobjects for a list of exceptions this may 

1442 throw. 

1443 """ 

1444 i = 0 # default count of entries if read_objects() is empty 

1445 for i, unpacked in enumerate(self.read_objects()): 

1446 if self._delta_iter: 

1447 self._delta_iter.record(unpacked) 

1448 if progress is not None: 

1449 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1450 if progress is not None: 

1451 progress(f"copied {i} pack entries\n".encode("ascii")) 

1452 

1453 

1454def obj_sha(type: int, chunks: Union[bytes, Iterable[bytes]]) -> bytes: 

1455 """Compute the SHA for a numeric type and object chunks.""" 

1456 sha = sha1() 

1457 sha.update(object_header(type, chunks_length(chunks))) 

1458 if isinstance(chunks, bytes): 

1459 sha.update(chunks) 

1460 else: 

1461 for chunk in chunks: 

1462 sha.update(chunk) 

1463 return sha.digest() 

1464 

1465 

1466def compute_file_sha( 

1467 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16 

1468) -> "HashObject": 

1469 """Hash a portion of a file into a new SHA. 

1470 

1471 Args: 

1472 f: A file-like object to read from that supports seek(). 

1473 start_ofs: The offset in the file to start reading at. 

1474 end_ofs: The offset in the file to end reading at, relative to the 

1475 end of the file. 

1476 buffer_size: A buffer size for reading. 

1477 Returns: A new SHA object updated with data read from the file. 

1478 """ 

1479 sha = sha1() 

1480 f.seek(0, SEEK_END) 

1481 length = f.tell() 

1482 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1483 raise AssertionError( 

1484 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1485 ) 

1486 todo = length + end_ofs - start_ofs 

1487 f.seek(start_ofs) 

1488 while todo: 

1489 data = f.read(min(todo, buffer_size)) 

1490 sha.update(data) 

1491 todo -= len(data) 

1492 return sha 

1493 

1494 

1495class PackData: 

1496 """The data contained in a packfile. 

1497 

1498 Pack files can be accessed both sequentially for exploding a pack, and 

1499 directly with the help of an index to retrieve a specific object. 

1500 

1501 The objects within are either complete or a delta against another. 

1502 

1503 The header is variable length. If the MSB of each byte is set then it 

1504 indicates that the subsequent byte is still part of the header. 

1505 For the first byte the next MS bits are the type, which tells you the type 

1506 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1507 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1508 size, i.e. the last byte of the header contains the MS bits of the size. 

1509 

1510 For the complete objects the data is stored as zlib deflated data. 

1511 The size in the header is the uncompressed object size, so to uncompress 

1512 you need to just keep feeding data to zlib until you get an object back, 

1513 or it errors on bad data. This is done here by just giving the complete 

1514 buffer from the start of the deflated object on. This is bad, but until I 

1515 get mmap sorted out it will have to do. 

1516 

1517 Currently there are no integrity checks done. Also no attempt is made to 

1518 try and detect the delta case, or a request for an object at the wrong 

1519 position. It will all just throw a zlib or KeyError. 

1520 """ 

1521 

1522 def __init__( 

1523 self, 

1524 filename: Union[str, os.PathLike[str]], 

1525 file: Optional[IO[bytes]] = None, 

1526 size: Optional[int] = None, 

1527 *, 

1528 delta_window_size: Optional[int] = None, 

1529 window_memory: Optional[int] = None, 

1530 delta_cache_size: Optional[int] = None, 

1531 depth: Optional[int] = None, 

1532 threads: Optional[int] = None, 

1533 big_file_threshold: Optional[int] = None, 

1534 ) -> None: 

1535 """Create a PackData object representing the pack in the given filename. 

1536 

1537 The file must exist and stay readable until the object is disposed of. 

1538 It must also stay the same size. It will be mapped whenever needed. 

1539 

1540 Currently there is a restriction on the size of the pack as the python 

1541 mmap implementation is flawed. 

1542 """ 

1543 self._filename = filename 

1544 self._size = size 

1545 self._header_size = 12 

1546 self.delta_window_size = delta_window_size 

1547 self.window_memory = window_memory 

1548 self.delta_cache_size = delta_cache_size 

1549 self.depth = depth 

1550 self.threads = threads 

1551 self.big_file_threshold = big_file_threshold 

1552 self._file: IO[bytes] 

1553 

1554 if file is None: 

1555 self._file = GitFile(self._filename, "rb") 

1556 else: 

1557 self._file = file 

1558 (_version, self._num_objects) = read_pack_header(self._file.read) 

1559 

1560 # Use delta_cache_size config if available, otherwise default 

1561 cache_size = delta_cache_size or (1024 * 1024 * 20) 

1562 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1563 cache_size, compute_size=_compute_object_size 

1564 ) 

1565 

1566 @property 

1567 def filename(self) -> str: 

1568 """Get the filename of the pack file. 

1569 

1570 Returns: 

1571 Base filename without directory path 

1572 """ 

1573 return os.path.basename(self._filename) 

1574 

1575 @property 

1576 def path(self) -> Union[str, os.PathLike[str]]: 

1577 """Get the full path of the pack file. 

1578 

1579 Returns: 

1580 Full path to the pack file 

1581 """ 

1582 return self._filename 

1583 

1584 @classmethod 

1585 def from_file(cls, file: IO[bytes], size: Optional[int] = None) -> "PackData": 

1586 """Create a PackData object from an open file. 

1587 

1588 Args: 

1589 file: Open file object 

1590 size: Optional file size 

1591 

1592 Returns: 

1593 PackData instance 

1594 """ 

1595 return cls(str(file), file=file, size=size) 

1596 

1597 @classmethod 

1598 def from_path(cls, path: Union[str, os.PathLike[str]]) -> "PackData": 

1599 """Create a PackData object from a file path. 

1600 

1601 Args: 

1602 path: Path to the pack file 

1603 

1604 Returns: 

1605 PackData instance 

1606 """ 

1607 return cls(filename=path) 

1608 

1609 def close(self) -> None: 

1610 """Close the underlying pack file.""" 

1611 self._file.close() 

1612 

1613 def __enter__(self) -> "PackData": 

1614 """Enter context manager.""" 

1615 return self 

1616 

1617 def __exit__( 

1618 self, 

1619 exc_type: Optional[type], 

1620 exc_val: Optional[BaseException], 

1621 exc_tb: Optional[TracebackType], 

1622 ) -> None: 

1623 """Exit context manager.""" 

1624 self.close() 

1625 

1626 def __eq__(self, other: object) -> bool: 

1627 """Check equality with another object.""" 

1628 if isinstance(other, PackData): 

1629 return self.get_stored_checksum() == other.get_stored_checksum() 

1630 return False 

1631 

1632 def _get_size(self) -> int: 

1633 if self._size is not None: 

1634 return self._size 

1635 self._size = os.path.getsize(self._filename) 

1636 if self._size < self._header_size: 

1637 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1638 raise AssertionError(errmsg) 

1639 return self._size 

1640 

1641 def __len__(self) -> int: 

1642 """Returns the number of objects in this pack.""" 

1643 return self._num_objects 

1644 

1645 def calculate_checksum(self) -> bytes: 

1646 """Calculate the checksum for this pack. 

1647 

1648 Returns: 20-byte binary SHA1 digest 

1649 """ 

1650 return compute_file_sha(self._file, end_ofs=-20).digest() 

1651 

1652 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1653 """Iterate over unpacked objects in the pack.""" 

1654 self._file.seek(self._header_size) 

1655 

1656 if self._num_objects is None: 

1657 return 

1658 

1659 for _ in range(self._num_objects): 

1660 offset = self._file.tell() 

1661 unpacked, unused = unpack_object( 

1662 self._file.read, compute_crc32=False, include_comp=include_comp 

1663 ) 

1664 unpacked.offset = offset 

1665 yield unpacked 

1666 # Back up over unused data. 

1667 self._file.seek(-len(unused), SEEK_CUR) 

1668 

1669 def iterentries( 

1670 self, 

1671 progress: Optional[Callable[[int, int], None]] = None, 

1672 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1673 ) -> Iterator[tuple[bytes, int, Optional[int]]]: 

1674 """Yield entries summarizing the contents of this pack. 

1675 

1676 Args: 

1677 progress: Progress function, called with current and total 

1678 object count. 

1679 resolve_ext_ref: Optional function to resolve external references 

1680 Returns: iterator of tuples with (sha, offset, crc32) 

1681 """ 

1682 num_objects = self._num_objects 

1683 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1684 for i, result in enumerate(indexer): 

1685 if progress is not None: 

1686 progress(i, num_objects) 

1687 yield result 

1688 

1689 def sorted_entries( 

1690 self, 

1691 progress: Optional[ProgressFn] = None, 

1692 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1693 ) -> list[tuple[bytes, int, int]]: 

1694 """Return entries in this pack, sorted by SHA. 

1695 

1696 Args: 

1697 progress: Progress function, called with current and total 

1698 object count 

1699 resolve_ext_ref: Optional function to resolve external references 

1700 Returns: Iterator of tuples with (sha, offset, crc32) 

1701 """ 

1702 return sorted( 

1703 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore 

1704 ) 

1705 

1706 def create_index_v1( 

1707 self, 

1708 filename: str, 

1709 progress: Optional[Callable[..., None]] = None, 

1710 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1711 ) -> bytes: 

1712 """Create a version 1 file for this data file. 

1713 

1714 Args: 

1715 filename: Index filename. 

1716 progress: Progress report function 

1717 resolve_ext_ref: Optional function to resolve external references 

1718 Returns: Checksum of index file 

1719 """ 

1720 entries = self.sorted_entries( 

1721 progress=progress, resolve_ext_ref=resolve_ext_ref 

1722 ) 

1723 checksum = self.calculate_checksum() 

1724 with GitFile(filename, "wb") as f: 

1725 write_pack_index_v1( 

1726 f, 

1727 entries, 

1728 checksum, 

1729 ) 

1730 return checksum 

1731 

1732 def create_index_v2( 

1733 self, 

1734 filename: str, 

1735 progress: Optional[Callable[..., None]] = None, 

1736 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1737 ) -> bytes: 

1738 """Create a version 2 index file for this data file. 

1739 

1740 Args: 

1741 filename: Index filename. 

1742 progress: Progress report function 

1743 resolve_ext_ref: Optional function to resolve external references 

1744 Returns: Checksum of index file 

1745 """ 

1746 entries = self.sorted_entries( 

1747 progress=progress, resolve_ext_ref=resolve_ext_ref 

1748 ) 

1749 with GitFile(filename, "wb") as f: 

1750 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

1751 

1752 def create_index_v3( 

1753 self, 

1754 filename: str, 

1755 progress: Optional[Callable[..., None]] = None, 

1756 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1757 hash_algorithm: int = 1, 

1758 ) -> bytes: 

1759 """Create a version 3 index file for this data file. 

1760 

1761 Args: 

1762 filename: Index filename. 

1763 progress: Progress report function 

1764 resolve_ext_ref: Function to resolve external references 

1765 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1766 Returns: Checksum of index file 

1767 """ 

1768 entries = self.sorted_entries( 

1769 progress=progress, resolve_ext_ref=resolve_ext_ref 

1770 ) 

1771 with GitFile(filename, "wb") as f: 

1772 return write_pack_index_v3( 

1773 f, entries, self.calculate_checksum(), hash_algorithm 

1774 ) 

1775 

1776 def create_index( 

1777 self, 

1778 filename: str, 

1779 progress: Optional[Callable[..., None]] = None, 

1780 version: int = 2, 

1781 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1782 hash_algorithm: int = 1, 

1783 ) -> bytes: 

1784 """Create an index file for this data file. 

1785 

1786 Args: 

1787 filename: Index filename. 

1788 progress: Progress report function 

1789 version: Index version (1, 2, or 3) 

1790 resolve_ext_ref: Function to resolve external references 

1791 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

1792 Returns: Checksum of index file 

1793 """ 

1794 if version == 1: 

1795 return self.create_index_v1( 

1796 filename, progress, resolve_ext_ref=resolve_ext_ref 

1797 ) 

1798 elif version == 2: 

1799 return self.create_index_v2( 

1800 filename, progress, resolve_ext_ref=resolve_ext_ref 

1801 ) 

1802 elif version == 3: 

1803 return self.create_index_v3( 

1804 filename, 

1805 progress, 

1806 resolve_ext_ref=resolve_ext_ref, 

1807 hash_algorithm=hash_algorithm, 

1808 ) 

1809 else: 

1810 raise ValueError(f"unknown index format {version}") 

1811 

1812 def get_stored_checksum(self) -> bytes: 

1813 """Return the expected checksum stored in this pack.""" 

1814 self._file.seek(-20, SEEK_END) 

1815 return self._file.read(20) 

1816 

1817 def check(self) -> None: 

1818 """Check the consistency of this pack.""" 

1819 actual = self.calculate_checksum() 

1820 stored = self.get_stored_checksum() 

1821 if actual != stored: 

1822 raise ChecksumMismatch(stored, actual) 

1823 

1824 def get_unpacked_object_at( 

1825 self, offset: int, *, include_comp: bool = False 

1826 ) -> UnpackedObject: 

1827 """Given offset in the packfile return a UnpackedObject.""" 

1828 assert offset >= self._header_size 

1829 self._file.seek(offset) 

1830 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp) 

1831 unpacked.offset = offset 

1832 return unpacked 

1833 

1834 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

1835 """Given an offset in to the packfile return the object that is there. 

1836 

1837 Using the associated index the location of an object can be looked up, 

1838 and then the packfile can be asked directly for that object using this 

1839 function. 

1840 """ 

1841 try: 

1842 return self._offset_cache[offset] 

1843 except KeyError: 

1844 pass 

1845 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

1846 return (unpacked.pack_type_num, unpacked._obj()) 

1847 

1848 

1849T = TypeVar("T") 

1850 

1851 

1852class DeltaChainIterator(Generic[T]): 

1853 """Abstract iterator over pack data based on delta chains. 

1854 

1855 Each object in the pack is guaranteed to be inflated exactly once, 

1856 regardless of how many objects reference it as a delta base. As a result, 

1857 memory usage is proportional to the length of the longest delta chain. 

1858 

1859 Subclasses can override _result to define the result type of the iterator. 

1860 By default, results are UnpackedObjects with the following members set: 

1861 

1862 * offset 

1863 * obj_type_num 

1864 * obj_chunks 

1865 * pack_type_num 

1866 * delta_base (for delta types) 

1867 * comp_chunks (if _include_comp is True) 

1868 * decomp_chunks 

1869 * decomp_len 

1870 * crc32 (if _compute_crc32 is True) 

1871 """ 

1872 

1873 _compute_crc32 = False 

1874 _include_comp = False 

1875 

1876 def __init__( 

1877 self, 

1878 file_obj: Optional[IO[bytes]], 

1879 *, 

1880 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1881 ) -> None: 

1882 """Initialize DeltaChainIterator. 

1883 

1884 Args: 

1885 file_obj: File object to read pack data from 

1886 resolve_ext_ref: Optional function to resolve external references 

1887 """ 

1888 self._file = file_obj 

1889 self._resolve_ext_ref = resolve_ext_ref 

1890 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

1891 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

1892 self._full_ofs: list[tuple[int, int]] = [] 

1893 self._ext_refs: list[bytes] = [] 

1894 

1895 @classmethod 

1896 def for_pack_data( 

1897 cls, pack_data: PackData, resolve_ext_ref: Optional[ResolveExtRefFn] = None 

1898 ) -> "DeltaChainIterator[T]": 

1899 """Create a DeltaChainIterator from pack data. 

1900 

1901 Args: 

1902 pack_data: PackData object to iterate 

1903 resolve_ext_ref: Optional function to resolve external refs 

1904 

1905 Returns: 

1906 DeltaChainIterator instance 

1907 """ 

1908 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1909 walker.set_pack_data(pack_data) 

1910 for unpacked in pack_data.iter_unpacked(include_comp=False): 

1911 walker.record(unpacked) 

1912 return walker 

1913 

1914 @classmethod 

1915 def for_pack_subset( 

1916 cls, 

1917 pack: "Pack", 

1918 shas: Iterable[bytes], 

1919 *, 

1920 allow_missing: bool = False, 

1921 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1922 ) -> "DeltaChainIterator[T]": 

1923 """Create a DeltaChainIterator for a subset of objects. 

1924 

1925 Args: 

1926 pack: Pack object containing the data 

1927 shas: Iterable of object SHAs to include 

1928 allow_missing: If True, skip missing objects 

1929 resolve_ext_ref: Optional function to resolve external refs 

1930 

1931 Returns: 

1932 DeltaChainIterator instance 

1933 """ 

1934 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1935 walker.set_pack_data(pack.data) 

1936 todo = set() 

1937 for sha in shas: 

1938 assert isinstance(sha, bytes) 

1939 try: 

1940 off = pack.index.object_offset(sha) 

1941 except KeyError: 

1942 if not allow_missing: 

1943 raise 

1944 else: 

1945 todo.add(off) 

1946 done = set() 

1947 while todo: 

1948 off = todo.pop() 

1949 unpacked = pack.data.get_unpacked_object_at(off) 

1950 walker.record(unpacked) 

1951 done.add(off) 

1952 base_ofs = None 

1953 if unpacked.pack_type_num == OFS_DELTA: 

1954 assert unpacked.offset is not None 

1955 assert unpacked.delta_base is not None 

1956 assert isinstance(unpacked.delta_base, int) 

1957 base_ofs = unpacked.offset - unpacked.delta_base 

1958 elif unpacked.pack_type_num == REF_DELTA: 

1959 with suppress(KeyError): 

1960 assert isinstance(unpacked.delta_base, bytes) 

1961 base_ofs = pack.index.object_index(unpacked.delta_base) 

1962 if base_ofs is not None and base_ofs not in done: 

1963 todo.add(base_ofs) 

1964 return walker 

1965 

1966 def record(self, unpacked: UnpackedObject) -> None: 

1967 """Record an unpacked object for later processing. 

1968 

1969 Args: 

1970 unpacked: UnpackedObject to record 

1971 """ 

1972 type_num = unpacked.pack_type_num 

1973 offset = unpacked.offset 

1974 assert offset is not None 

1975 if type_num == OFS_DELTA: 

1976 assert unpacked.delta_base is not None 

1977 assert isinstance(unpacked.delta_base, int) 

1978 base_offset = offset - unpacked.delta_base 

1979 self._pending_ofs[base_offset].append(offset) 

1980 elif type_num == REF_DELTA: 

1981 assert isinstance(unpacked.delta_base, bytes) 

1982 self._pending_ref[unpacked.delta_base].append(offset) 

1983 else: 

1984 self._full_ofs.append((offset, type_num)) 

1985 

1986 def set_pack_data(self, pack_data: PackData) -> None: 

1987 """Set the pack data for iteration. 

1988 

1989 Args: 

1990 pack_data: PackData object to use 

1991 """ 

1992 self._file = pack_data._file 

1993 

1994 def _walk_all_chains(self) -> Iterator[T]: 

1995 for offset, type_num in self._full_ofs: 

1996 yield from self._follow_chain(offset, type_num, None) 

1997 yield from self._walk_ref_chains() 

1998 assert not self._pending_ofs, repr(self._pending_ofs) 

1999 

2000 def _ensure_no_pending(self) -> None: 

2001 if self._pending_ref: 

2002 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref]) 

2003 

2004 def _walk_ref_chains(self) -> Iterator[T]: 

2005 if not self._resolve_ext_ref: 

2006 self._ensure_no_pending() 

2007 return 

2008 

2009 for base_sha, pending in sorted(self._pending_ref.items()): 

2010 if base_sha not in self._pending_ref: 

2011 continue 

2012 try: 

2013 type_num, chunks = self._resolve_ext_ref(base_sha) 

2014 except KeyError: 

2015 # Not an external ref, but may depend on one. Either it will 

2016 # get popped via a _follow_chain call, or we will raise an 

2017 # error below. 

2018 continue 

2019 self._ext_refs.append(base_sha) 

2020 self._pending_ref.pop(base_sha) 

2021 for new_offset in pending: 

2022 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type] 

2023 

2024 self._ensure_no_pending() 

2025 

2026 def _result(self, unpacked: UnpackedObject) -> T: 

2027 raise NotImplementedError 

2028 

2029 def _resolve_object( 

2030 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2031 ) -> UnpackedObject: 

2032 assert self._file is not None 

2033 self._file.seek(offset) 

2034 unpacked, _ = unpack_object( 

2035 self._file.read, 

2036 include_comp=self._include_comp, 

2037 compute_crc32=self._compute_crc32, 

2038 ) 

2039 unpacked.offset = offset 

2040 if base_chunks is None: 

2041 assert unpacked.pack_type_num == obj_type_num 

2042 else: 

2043 assert unpacked.pack_type_num in DELTA_TYPES 

2044 unpacked.obj_type_num = obj_type_num 

2045 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2046 return unpacked 

2047 

2048 def _follow_chain( 

2049 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2050 ) -> Iterator[T]: 

2051 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2052 # this approach by design inflates each object exactly once. 

2053 todo = [(offset, obj_type_num, base_chunks)] 

2054 while todo: 

2055 (offset, obj_type_num, base_chunks) = todo.pop() 

2056 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2057 yield self._result(unpacked) 

2058 

2059 assert unpacked.offset is not None 

2060 unblocked = chain( 

2061 self._pending_ofs.pop(unpacked.offset, []), 

2062 self._pending_ref.pop(unpacked.sha(), []), 

2063 ) 

2064 todo.extend( 

2065 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2066 for new_offset in unblocked 

2067 ) 

2068 

2069 def __iter__(self) -> Iterator[T]: 

2070 """Iterate over objects in the pack.""" 

2071 return self._walk_all_chains() 

2072 

2073 def ext_refs(self) -> list[bytes]: 

2074 """Return external references.""" 

2075 return self._ext_refs 

2076 

2077 

2078class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2079 """Delta chain iterator that yield unpacked objects.""" 

2080 

2081 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2082 """Return the unpacked object. 

2083 

2084 Args: 

2085 unpacked: The unpacked object 

2086 

2087 Returns: 

2088 The unpacked object unchanged 

2089 """ 

2090 return unpacked 

2091 

2092 

2093class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2094 """Delta chain iterator that yields index entries.""" 

2095 

2096 _compute_crc32 = True 

2097 

2098 def _result(self, unpacked: UnpackedObject) -> tuple[bytes, int, Optional[int]]: 

2099 """Convert unpacked object to pack index entry. 

2100 

2101 Args: 

2102 unpacked: The unpacked object 

2103 

2104 Returns: 

2105 Tuple of (sha, offset, crc32) for index entry 

2106 """ 

2107 assert unpacked.offset is not None 

2108 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2109 

2110 

2111class PackInflater(DeltaChainIterator[ShaFile]): 

2112 """Delta chain iterator that yields ShaFile objects.""" 

2113 

2114 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2115 """Convert unpacked object to ShaFile. 

2116 

2117 Args: 

2118 unpacked: The unpacked object 

2119 

2120 Returns: 

2121 ShaFile object from the unpacked data 

2122 """ 

2123 return unpacked.sha_file() 

2124 

2125 

2126class SHA1Reader(BinaryIO): 

2127 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2128 

2129 def __init__(self, f: IO[bytes]) -> None: 

2130 """Initialize SHA1Reader. 

2131 

2132 Args: 

2133 f: File-like object to wrap 

2134 """ 

2135 self.f = f 

2136 self.sha1 = sha1(b"") 

2137 

2138 def read(self, size: int = -1) -> bytes: 

2139 """Read bytes and update SHA1. 

2140 

2141 Args: 

2142 size: Number of bytes to read, -1 for all 

2143 

2144 Returns: 

2145 Bytes read from file 

2146 """ 

2147 data = self.f.read(size) 

2148 self.sha1.update(data) 

2149 return data 

2150 

2151 def check_sha(self, allow_empty: bool = False) -> None: 

2152 """Check if the SHA1 matches the expected value. 

2153 

2154 Args: 

2155 allow_empty: Allow empty SHA1 hash 

2156 

2157 Raises: 

2158 ChecksumMismatch: If SHA1 doesn't match 

2159 """ 

2160 stored = self.f.read(20) 

2161 # If git option index.skipHash is set the index will be empty 

2162 if stored != self.sha1.digest() and ( 

2163 not allow_empty 

2164 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000" 

2165 ): 

2166 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored)) 

2167 

2168 def close(self) -> None: 

2169 """Close the underlying file.""" 

2170 return self.f.close() 

2171 

2172 def tell(self) -> int: 

2173 """Return current file position.""" 

2174 return self.f.tell() 

2175 

2176 # BinaryIO abstract methods 

2177 def readable(self) -> bool: 

2178 """Check if file is readable.""" 

2179 return True 

2180 

2181 def writable(self) -> bool: 

2182 """Check if file is writable.""" 

2183 return False 

2184 

2185 def seekable(self) -> bool: 

2186 """Check if file is seekable.""" 

2187 return getattr(self.f, "seekable", lambda: False)() 

2188 

2189 def seek(self, offset: int, whence: int = 0) -> int: 

2190 """Seek to position in file. 

2191 

2192 Args: 

2193 offset: Position offset 

2194 whence: Reference point (0=start, 1=current, 2=end) 

2195 

2196 Returns: 

2197 New file position 

2198 """ 

2199 return self.f.seek(offset, whence) 

2200 

2201 def flush(self) -> None: 

2202 """Flush the file buffer.""" 

2203 if hasattr(self.f, "flush"): 

2204 self.f.flush() 

2205 

2206 def readline(self, size: int = -1) -> bytes: 

2207 """Read a line from the file. 

2208 

2209 Args: 

2210 size: Maximum bytes to read 

2211 

2212 Returns: 

2213 Line read from file 

2214 """ 

2215 return self.f.readline(size) 

2216 

2217 def readlines(self, hint: int = -1) -> list[bytes]: 

2218 """Read all lines from the file. 

2219 

2220 Args: 

2221 hint: Approximate number of bytes to read 

2222 

2223 Returns: 

2224 List of lines 

2225 """ 

2226 return self.f.readlines(hint) 

2227 

2228 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2229 """Write multiple lines to the file (not supported).""" 

2230 raise UnsupportedOperation("writelines") 

2231 

2232 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2233 """Write data to the file (not supported).""" 

2234 raise UnsupportedOperation("write") 

2235 

2236 def __enter__(self) -> "SHA1Reader": 

2237 """Enter context manager.""" 

2238 return self 

2239 

2240 def __exit__( 

2241 self, 

2242 type: Optional[type], 

2243 value: Optional[BaseException], 

2244 traceback: Optional[TracebackType], 

2245 ) -> None: 

2246 """Exit context manager and close file.""" 

2247 self.close() 

2248 

2249 def __iter__(self) -> "SHA1Reader": 

2250 """Return iterator for reading file lines.""" 

2251 return self 

2252 

2253 def __next__(self) -> bytes: 

2254 """Get next line from file. 

2255 

2256 Returns: 

2257 Next line 

2258 

2259 Raises: 

2260 StopIteration: When no more lines 

2261 """ 

2262 line = self.readline() 

2263 if not line: 

2264 raise StopIteration 

2265 return line 

2266 

2267 def fileno(self) -> int: 

2268 """Return file descriptor number.""" 

2269 return self.f.fileno() 

2270 

2271 def isatty(self) -> bool: 

2272 """Check if file is a terminal.""" 

2273 return getattr(self.f, "isatty", lambda: False)() 

2274 

2275 def truncate(self, size: Optional[int] = None) -> int: 

2276 """Not supported for read-only file. 

2277 

2278 Raises: 

2279 UnsupportedOperation: Always raised 

2280 """ 

2281 raise UnsupportedOperation("truncate") 

2282 

2283 

2284class SHA1Writer(BinaryIO): 

2285 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2286 

2287 def __init__(self, f: Union[BinaryIO, IO[bytes]]) -> None: 

2288 """Initialize SHA1Writer. 

2289 

2290 Args: 

2291 f: File-like object to wrap 

2292 """ 

2293 self.f = f 

2294 self.length = 0 

2295 self.sha1 = sha1(b"") 

2296 self.digest: Optional[bytes] = None 

2297 

2298 def write(self, data: Union[bytes, bytearray, memoryview], /) -> int: # type: ignore[override] 

2299 """Write data and update SHA1. 

2300 

2301 Args: 

2302 data: Data to write 

2303 

2304 Returns: 

2305 Number of bytes written 

2306 """ 

2307 self.sha1.update(data) 

2308 written = self.f.write(data) 

2309 self.length += written 

2310 return written 

2311 

2312 def write_sha(self) -> bytes: 

2313 """Write the SHA1 digest to the file. 

2314 

2315 Returns: 

2316 The SHA1 digest bytes 

2317 """ 

2318 sha = self.sha1.digest() 

2319 assert len(sha) == 20 

2320 self.f.write(sha) 

2321 self.length += len(sha) 

2322 return sha 

2323 

2324 def close(self) -> None: 

2325 """Close the pack file and finalize the SHA.""" 

2326 self.digest = self.write_sha() 

2327 self.f.close() 

2328 

2329 def offset(self) -> int: 

2330 """Get the total number of bytes written. 

2331 

2332 Returns: 

2333 Total bytes written 

2334 """ 

2335 return self.length 

2336 

2337 def tell(self) -> int: 

2338 """Return current file position.""" 

2339 return self.f.tell() 

2340 

2341 # BinaryIO abstract methods 

2342 def readable(self) -> bool: 

2343 """Check if file is readable.""" 

2344 return False 

2345 

2346 def writable(self) -> bool: 

2347 """Check if file is writable.""" 

2348 return True 

2349 

2350 def seekable(self) -> bool: 

2351 """Check if file is seekable.""" 

2352 return getattr(self.f, "seekable", lambda: False)() 

2353 

2354 def seek(self, offset: int, whence: int = 0) -> int: 

2355 """Seek to position in file. 

2356 

2357 Args: 

2358 offset: Position offset 

2359 whence: Reference point (0=start, 1=current, 2=end) 

2360 

2361 Returns: 

2362 New file position 

2363 """ 

2364 return self.f.seek(offset, whence) 

2365 

2366 def flush(self) -> None: 

2367 """Flush the file buffer.""" 

2368 if hasattr(self.f, "flush"): 

2369 self.f.flush() 

2370 

2371 def readline(self, size: int = -1) -> bytes: 

2372 """Not supported for write-only file. 

2373 

2374 Raises: 

2375 UnsupportedOperation: Always raised 

2376 """ 

2377 raise UnsupportedOperation("readline") 

2378 

2379 def readlines(self, hint: int = -1) -> list[bytes]: 

2380 """Not supported for write-only file. 

2381 

2382 Raises: 

2383 UnsupportedOperation: Always raised 

2384 """ 

2385 raise UnsupportedOperation("readlines") 

2386 

2387 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2388 """Write multiple lines to the file. 

2389 

2390 Args: 

2391 lines: Iterable of lines to write 

2392 """ 

2393 for line in lines: 

2394 self.write(line) 

2395 

2396 def read(self, size: int = -1) -> bytes: 

2397 """Not supported for write-only file. 

2398 

2399 Raises: 

2400 UnsupportedOperation: Always raised 

2401 """ 

2402 raise UnsupportedOperation("read") 

2403 

2404 def __enter__(self) -> "SHA1Writer": 

2405 """Enter context manager.""" 

2406 return self 

2407 

2408 def __exit__( 

2409 self, 

2410 type: Optional[type], 

2411 value: Optional[BaseException], 

2412 traceback: Optional[TracebackType], 

2413 ) -> None: 

2414 """Exit context manager and close file.""" 

2415 self.close() 

2416 

2417 def __iter__(self) -> "SHA1Writer": 

2418 """Return iterator.""" 

2419 return self 

2420 

2421 def __next__(self) -> bytes: 

2422 """Not supported for write-only file. 

2423 

2424 Raises: 

2425 UnsupportedOperation: Always raised 

2426 """ 

2427 raise UnsupportedOperation("__next__") 

2428 

2429 def fileno(self) -> int: 

2430 """Return file descriptor number.""" 

2431 return self.f.fileno() 

2432 

2433 def isatty(self) -> bool: 

2434 """Check if file is a terminal.""" 

2435 return getattr(self.f, "isatty", lambda: False)() 

2436 

2437 def truncate(self, size: Optional[int] = None) -> int: 

2438 """Not supported for write-only file. 

2439 

2440 Raises: 

2441 UnsupportedOperation: Always raised 

2442 """ 

2443 raise UnsupportedOperation("truncate") 

2444 

2445 

2446def pack_object_header( 

2447 type_num: int, delta_base: Optional[Union[bytes, int]], size: int 

2448) -> bytearray: 

2449 """Create a pack object header for the given object info. 

2450 

2451 Args: 

2452 type_num: Numeric type of the object. 

2453 delta_base: Delta base offset or ref, or None for whole objects. 

2454 size: Uncompressed object size. 

2455 Returns: A header for a packed object. 

2456 """ 

2457 header = [] 

2458 c = (type_num << 4) | (size & 15) 

2459 size >>= 4 

2460 while size: 

2461 header.append(c | 0x80) 

2462 c = size & 0x7F 

2463 size >>= 7 

2464 header.append(c) 

2465 if type_num == OFS_DELTA: 

2466 assert isinstance(delta_base, int) 

2467 ret = [delta_base & 0x7F] 

2468 delta_base >>= 7 

2469 while delta_base: 

2470 delta_base -= 1 

2471 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2472 delta_base >>= 7 

2473 header.extend(ret) 

2474 elif type_num == REF_DELTA: 

2475 assert isinstance(delta_base, bytes) 

2476 assert len(delta_base) == 20 

2477 header += delta_base 

2478 return bytearray(header) 

2479 

2480 

2481def pack_object_chunks( 

2482 type: int, 

2483 object: Union[list[bytes], tuple[Union[bytes, int], list[bytes]]], 

2484 compression_level: int = -1, 

2485) -> Iterator[bytes]: 

2486 """Generate chunks for a pack object. 

2487 

2488 Args: 

2489 type: Numeric type of the object 

2490 object: Object to write 

2491 compression_level: the zlib compression level 

2492 Returns: Chunks 

2493 """ 

2494 if type in DELTA_TYPES: 

2495 if isinstance(object, tuple): 

2496 delta_base, object = object 

2497 else: 

2498 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2499 else: 

2500 delta_base = None 

2501 

2502 # Convert object to list of bytes chunks 

2503 if isinstance(object, bytes): 

2504 chunks = [object] 

2505 elif isinstance(object, list): 

2506 chunks = object 

2507 elif isinstance(object, ShaFile): 

2508 chunks = object.as_raw_chunks() 

2509 else: 

2510 # Shouldn't reach here with proper typing 

2511 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

2512 

2513 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks)))) 

2514 compressor = zlib.compressobj(level=compression_level) 

2515 for data in chunks: 

2516 yield compressor.compress(data) 

2517 yield compressor.flush() 

2518 

2519 

2520def write_pack_object( 

2521 write: Callable[[bytes], int], 

2522 type: int, 

2523 object: Union[list[bytes], tuple[Union[bytes, int], list[bytes]]], 

2524 sha: Optional["HashObject"] = None, 

2525 compression_level: int = -1, 

2526) -> int: 

2527 """Write pack object to a file. 

2528 

2529 Args: 

2530 write: Write function to use 

2531 type: Numeric type of the object 

2532 object: Object to write 

2533 sha: Optional SHA-1 hasher to update 

2534 compression_level: the zlib compression level 

2535 Returns: CRC32 checksum of the written object 

2536 """ 

2537 crc32 = 0 

2538 for chunk in pack_object_chunks(type, object, compression_level=compression_level): 

2539 write(chunk) 

2540 if sha is not None: 

2541 sha.update(chunk) 

2542 crc32 = binascii.crc32(chunk, crc32) 

2543 return crc32 & 0xFFFFFFFF 

2544 

2545 

2546def write_pack( 

2547 filename: str, 

2548 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2549 *, 

2550 deltify: Optional[bool] = None, 

2551 delta_window_size: Optional[int] = None, 

2552 compression_level: int = -1, 

2553) -> tuple[bytes, bytes]: 

2554 """Write a new pack data file. 

2555 

2556 Args: 

2557 filename: Path to the new pack file (without .pack extension) 

2558 objects: Objects to write to the pack 

2559 delta_window_size: Delta window size 

2560 deltify: Whether to deltify pack objects 

2561 compression_level: the zlib compression level 

2562 Returns: Tuple with checksum of pack file and index file 

2563 """ 

2564 with GitFile(filename + ".pack", "wb") as f: 

2565 entries, data_sum = write_pack_objects( 

2566 f, 

2567 objects, 

2568 delta_window_size=delta_window_size, 

2569 deltify=deltify, 

2570 compression_level=compression_level, 

2571 ) 

2572 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

2573 with GitFile(filename + ".idx", "wb") as f: 

2574 idx_sha = write_pack_index(f, entries_list, data_sum) 

2575 return data_sum, idx_sha 

2576 

2577 

2578def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

2579 """Yield chunks for a pack header.""" 

2580 yield b"PACK" # Pack header 

2581 yield struct.pack(b">L", 2) # Pack version 

2582 yield struct.pack(b">L", num_objects) # Number of objects in pack 

2583 

2584 

2585def write_pack_header( 

2586 write: Union[Callable[[bytes], int], IO[bytes]], num_objects: int 

2587) -> None: 

2588 """Write a pack header for the given number of objects.""" 

2589 write_fn: Callable[[bytes], int] 

2590 if hasattr(write, "write"): 

2591 write_fn = write.write 

2592 warnings.warn( 

2593 "write_pack_header() now takes a write rather than file argument", 

2594 DeprecationWarning, 

2595 stacklevel=2, 

2596 ) 

2597 else: 

2598 write_fn = write 

2599 for chunk in pack_header_chunks(num_objects): 

2600 write_fn(chunk) 

2601 

2602 

2603def find_reusable_deltas( 

2604 container: PackedObjectContainer, 

2605 object_ids: Set[bytes], 

2606 *, 

2607 other_haves: Optional[Set[bytes]] = None, 

2608 progress: Optional[Callable[..., None]] = None, 

2609) -> Iterator[UnpackedObject]: 

2610 """Find deltas in a pack that can be reused. 

2611 

2612 Args: 

2613 container: Pack container to search for deltas 

2614 object_ids: Set of object IDs to find deltas for 

2615 other_haves: Set of other object IDs we have 

2616 progress: Optional progress reporting callback 

2617 

2618 Returns: 

2619 Iterator of UnpackedObject entries that can be reused 

2620 """ 

2621 if other_haves is None: 

2622 other_haves = set() 

2623 reused = 0 

2624 for i, unpacked in enumerate( 

2625 container.iter_unpacked_subset( 

2626 object_ids, allow_missing=True, convert_ofs_delta=True 

2627 ) 

2628 ): 

2629 if progress is not None and i % 1000 == 0: 

2630 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

2631 if unpacked.pack_type_num == REF_DELTA: 

2632 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

2633 if hexsha in object_ids or hexsha in other_haves: 

2634 yield unpacked 

2635 reused += 1 

2636 if progress is not None: 

2637 progress((f"found {reused} deltas to reuse\n").encode()) 

2638 

2639 

2640def deltify_pack_objects( 

2641 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[bytes]]]], 

2642 *, 

2643 window_size: Optional[int] = None, 

2644 progress: Optional[Callable[..., None]] = None, 

2645) -> Iterator[UnpackedObject]: 

2646 """Generate deltas for pack objects. 

2647 

2648 Args: 

2649 objects: An iterable of (object, path) tuples to deltify. 

2650 window_size: Window size; None for default 

2651 progress: Optional progress reporting callback 

2652 Returns: Iterator over type_num, object id, delta_base, content 

2653 delta_base is None for full text entries 

2654 """ 

2655 

2656 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, Optional[bytes]]]]: 

2657 for e in objects: 

2658 if isinstance(e, ShaFile): 

2659 yield (e, (e.type_num, None)) 

2660 else: 

2661 yield (e[0], (e[0].type_num, e[1])) 

2662 

2663 sorted_objs = sort_objects_for_delta(objects_with_hints()) 

2664 yield from deltas_from_sorted_objects( 

2665 sorted_objs, 

2666 window_size=window_size, 

2667 progress=progress, 

2668 ) 

2669 

2670 

2671def sort_objects_for_delta( 

2672 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[PackHint]]]], 

2673) -> Iterator[tuple[ShaFile, Optional[bytes]]]: 

2674 """Sort objects for optimal delta compression. 

2675 

2676 Args: 

2677 objects: Iterator of objects or (object, hint) tuples 

2678 

2679 Returns: 

2680 Iterator of sorted (ShaFile, path) tuples 

2681 """ 

2682 magic = [] 

2683 for entry in objects: 

2684 if isinstance(entry, tuple): 

2685 obj, hint = entry 

2686 if hint is None: 

2687 type_num = None 

2688 path = None 

2689 else: 

2690 (type_num, path) = hint 

2691 else: 

2692 obj = entry 

2693 type_num = None 

2694 path = None 

2695 magic.append((type_num, path, -obj.raw_length(), obj)) 

2696 # Build a list of objects ordered by the magic Linus heuristic 

2697 # This helps us find good objects to diff against us 

2698 magic.sort() 

2699 return ((x[3], x[1]) for x in magic) 

2700 

2701 

2702def deltas_from_sorted_objects( 

2703 objects: Iterator[tuple[ShaFile, Optional[bytes]]], 

2704 window_size: Optional[int] = None, 

2705 progress: Optional[Callable[..., None]] = None, 

2706) -> Iterator[UnpackedObject]: 

2707 """Create deltas from sorted objects. 

2708 

2709 Args: 

2710 objects: Iterator of sorted objects to deltify 

2711 window_size: Delta window size; None for default 

2712 progress: Optional progress reporting callback 

2713 

2714 Returns: 

2715 Iterator of UnpackedObject entries 

2716 """ 

2717 # TODO(jelmer): Use threads 

2718 if window_size is None: 

2719 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

2720 

2721 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

2722 for i, (o, path) in enumerate(objects): 

2723 if progress is not None and i % 1000 == 0: 

2724 progress((f"generating deltas: {i}\r").encode()) 

2725 raw = o.as_raw_chunks() 

2726 winner = raw 

2727 winner_len = sum(map(len, winner)) 

2728 winner_base = None 

2729 for base_id, base_type_num, base in possible_bases: 

2730 if base_type_num != o.type_num: 

2731 continue 

2732 delta_len = 0 

2733 delta = [] 

2734 for chunk in create_delta(b"".join(base), b"".join(raw)): 

2735 delta_len += len(chunk) 

2736 if delta_len >= winner_len: 

2737 break 

2738 delta.append(chunk) 

2739 else: 

2740 winner_base = base_id 

2741 winner = delta 

2742 winner_len = sum(map(len, winner)) 

2743 yield UnpackedObject( 

2744 o.type_num, 

2745 sha=o.sha().digest(), 

2746 delta_base=winner_base, 

2747 decomp_len=winner_len, 

2748 decomp_chunks=winner, 

2749 ) 

2750 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

2751 while len(possible_bases) > window_size: 

2752 possible_bases.pop() 

2753 

2754 

2755def pack_objects_to_data( 

2756 objects: Union[ 

2757 Sequence[ShaFile], 

2758 Sequence[tuple[ShaFile, Optional[bytes]]], 

2759 Sequence[tuple[ShaFile, Optional[PackHint]]], 

2760 ], 

2761 *, 

2762 deltify: Optional[bool] = None, 

2763 delta_window_size: Optional[int] = None, 

2764 ofs_delta: bool = True, 

2765 progress: Optional[Callable[..., None]] = None, 

2766) -> tuple[int, Iterator[UnpackedObject]]: 

2767 """Create pack data from objects. 

2768 

2769 Args: 

2770 objects: Pack objects 

2771 deltify: Whether to deltify pack objects 

2772 delta_window_size: Delta window size 

2773 ofs_delta: Whether to use offset deltas 

2774 progress: Optional progress reporting callback 

2775 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2776 """ 

2777 count = len(objects) 

2778 if deltify is None: 

2779 # PERFORMANCE/TODO(jelmer): This should be enabled but the python 

2780 # implementation is *much* too slow at the moment. 

2781 # Maybe consider enabling it just if the rust extension is available? 

2782 deltify = False 

2783 if deltify: 

2784 return ( 

2785 count, 

2786 deltify_pack_objects( 

2787 iter(objects), # type: ignore 

2788 window_size=delta_window_size, 

2789 progress=progress, 

2790 ), 

2791 ) 

2792 else: 

2793 

2794 def iter_without_path() -> Iterator[UnpackedObject]: 

2795 for o in objects: 

2796 if isinstance(o, tuple): 

2797 yield full_unpacked_object(o[0]) 

2798 else: 

2799 yield full_unpacked_object(o) 

2800 

2801 return (count, iter_without_path()) 

2802 

2803 

2804def generate_unpacked_objects( 

2805 container: PackedObjectContainer, 

2806 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2807 delta_window_size: Optional[int] = None, 

2808 deltify: Optional[bool] = None, 

2809 reuse_deltas: bool = True, 

2810 ofs_delta: bool = True, 

2811 other_haves: Optional[set[bytes]] = None, 

2812 progress: Optional[Callable[..., None]] = None, 

2813) -> Iterator[UnpackedObject]: 

2814 """Create pack data from objects. 

2815 

2816 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2817 """ 

2818 todo = dict(object_ids) 

2819 if reuse_deltas: 

2820 for unpack in find_reusable_deltas( 

2821 container, set(todo), other_haves=other_haves, progress=progress 

2822 ): 

2823 del todo[sha_to_hex(unpack.sha())] 

2824 yield unpack 

2825 if deltify is None: 

2826 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2827 # slow at the moment. 

2828 deltify = False 

2829 if deltify: 

2830 objects_to_delta = container.iterobjects_subset( 

2831 todo.keys(), allow_missing=False 

2832 ) 

2833 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta) 

2834 yield from deltas_from_sorted_objects( 

2835 sorted_objs, 

2836 window_size=delta_window_size, 

2837 progress=progress, 

2838 ) 

2839 else: 

2840 for oid in todo: 

2841 yield full_unpacked_object(container[oid]) 

2842 

2843 

2844def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

2845 """Create an UnpackedObject from a ShaFile. 

2846 

2847 Args: 

2848 o: ShaFile object to convert 

2849 

2850 Returns: 

2851 UnpackedObject with full object data 

2852 """ 

2853 return UnpackedObject( 

2854 o.type_num, 

2855 delta_base=None, 

2856 crc32=None, 

2857 decomp_chunks=o.as_raw_chunks(), 

2858 sha=o.sha().digest(), 

2859 ) 

2860 

2861 

2862def write_pack_from_container( 

2863 write: Union[ 

2864 Callable[[bytes], None], 

2865 Callable[[Union[bytes, bytearray, memoryview]], int], 

2866 IO[bytes], 

2867 ], 

2868 container: PackedObjectContainer, 

2869 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2870 delta_window_size: Optional[int] = None, 

2871 deltify: Optional[bool] = None, 

2872 reuse_deltas: bool = True, 

2873 compression_level: int = -1, 

2874 other_haves: Optional[set[bytes]] = None, 

2875) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

2876 """Write a new pack data file. 

2877 

2878 Args: 

2879 write: write function to use 

2880 container: PackedObjectContainer 

2881 object_ids: Sequence of (object_id, hint) tuples to write 

2882 delta_window_size: Sliding window size for searching for deltas; 

2883 Set to None for default window size. 

2884 deltify: Whether to deltify objects 

2885 reuse_deltas: Whether to reuse existing deltas 

2886 compression_level: the zlib compression level to use 

2887 other_haves: Set of additional object IDs the receiver has 

2888 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2889 """ 

2890 pack_contents_count = len(object_ids) 

2891 pack_contents = generate_unpacked_objects( 

2892 container, 

2893 object_ids, 

2894 delta_window_size=delta_window_size, 

2895 deltify=deltify, 

2896 reuse_deltas=reuse_deltas, 

2897 other_haves=other_haves, 

2898 ) 

2899 

2900 return write_pack_data( 

2901 write, 

2902 pack_contents, 

2903 num_records=pack_contents_count, 

2904 compression_level=compression_level, 

2905 ) 

2906 

2907 

2908def write_pack_objects( 

2909 write: Union[Callable[[bytes], None], IO[bytes]], 

2910 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2911 *, 

2912 delta_window_size: Optional[int] = None, 

2913 deltify: Optional[bool] = None, 

2914 compression_level: int = -1, 

2915) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

2916 """Write a new pack data file. 

2917 

2918 Args: 

2919 write: write function to use 

2920 objects: Sequence of (object, path) tuples to write 

2921 delta_window_size: Sliding window size for searching for deltas; 

2922 Set to None for default window size. 

2923 deltify: Whether to deltify objects 

2924 compression_level: the zlib compression level to use 

2925 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2926 """ 

2927 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

2928 

2929 return write_pack_data( 

2930 write, 

2931 pack_contents, 

2932 num_records=pack_contents_count, 

2933 compression_level=compression_level, 

2934 ) 

2935 

2936 

2937class PackChunkGenerator: 

2938 """Generator for pack data chunks.""" 

2939 

2940 def __init__( 

2941 self, 

2942 num_records: Optional[int] = None, 

2943 records: Optional[Iterator[UnpackedObject]] = None, 

2944 progress: Optional[Callable[..., None]] = None, 

2945 compression_level: int = -1, 

2946 reuse_compressed: bool = True, 

2947 ) -> None: 

2948 """Initialize PackChunkGenerator. 

2949 

2950 Args: 

2951 num_records: Expected number of records 

2952 records: Iterator of pack records 

2953 progress: Optional progress callback 

2954 compression_level: Compression level (-1 for default) 

2955 reuse_compressed: Whether to reuse compressed chunks 

2956 """ 

2957 self.cs = sha1(b"") 

2958 self.entries: dict[bytes, tuple[int, int]] = {} 

2959 if records is None: 

2960 records = iter([]) # Empty iterator if None 

2961 self._it = self._pack_data_chunks( 

2962 records=records, 

2963 num_records=num_records, 

2964 progress=progress, 

2965 compression_level=compression_level, 

2966 reuse_compressed=reuse_compressed, 

2967 ) 

2968 

2969 def sha1digest(self) -> bytes: 

2970 """Return the SHA1 digest of the pack data.""" 

2971 return self.cs.digest() 

2972 

2973 def __iter__(self) -> Iterator[bytes]: 

2974 """Iterate over pack data chunks.""" 

2975 return self._it 

2976 

2977 def _pack_data_chunks( 

2978 self, 

2979 records: Iterator[UnpackedObject], 

2980 *, 

2981 num_records: Optional[int] = None, 

2982 progress: Optional[Callable[..., None]] = None, 

2983 compression_level: int = -1, 

2984 reuse_compressed: bool = True, 

2985 ) -> Iterator[bytes]: 

2986 """Iterate pack data file chunks. 

2987 

2988 Args: 

2989 records: Iterator over UnpackedObject 

2990 num_records: Number of records (defaults to len(records) if not specified) 

2991 progress: Function to report progress to 

2992 compression_level: the zlib compression level 

2993 reuse_compressed: Whether to reuse compressed chunks 

2994 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2995 """ 

2996 # Write the pack 

2997 if num_records is None: 

2998 num_records = len(records) # type: ignore 

2999 offset = 0 

3000 for chunk in pack_header_chunks(num_records): 

3001 yield chunk 

3002 self.cs.update(chunk) 

3003 offset += len(chunk) 

3004 actual_num_records = 0 

3005 for i, unpacked in enumerate(records): 

3006 type_num = unpacked.pack_type_num 

3007 if progress is not None and i % 1000 == 0: 

3008 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

3009 raw: Union[list[bytes], tuple[int, list[bytes]], tuple[bytes, list[bytes]]] 

3010 if unpacked.delta_base is not None: 

3011 assert isinstance(unpacked.delta_base, bytes), ( 

3012 f"Expected bytes, got {type(unpacked.delta_base)}" 

3013 ) 

3014 try: 

3015 base_offset, _base_crc32 = self.entries[unpacked.delta_base] 

3016 except KeyError: 

3017 type_num = REF_DELTA 

3018 assert isinstance(unpacked.delta_base, bytes) 

3019 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

3020 else: 

3021 type_num = OFS_DELTA 

3022 raw = (offset - base_offset, unpacked.decomp_chunks) 

3023 else: 

3024 raw = unpacked.decomp_chunks 

3025 chunks: Union[list[bytes], Iterator[bytes]] 

3026 if unpacked.comp_chunks is not None and reuse_compressed: 

3027 chunks = unpacked.comp_chunks 

3028 else: 

3029 chunks = pack_object_chunks( 

3030 type_num, raw, compression_level=compression_level 

3031 ) 

3032 crc32 = 0 

3033 object_size = 0 

3034 for chunk in chunks: 

3035 yield chunk 

3036 crc32 = binascii.crc32(chunk, crc32) 

3037 self.cs.update(chunk) 

3038 object_size += len(chunk) 

3039 actual_num_records += 1 

3040 self.entries[unpacked.sha()] = (offset, crc32) 

3041 offset += object_size 

3042 if actual_num_records != num_records: 

3043 raise AssertionError( 

3044 f"actual records written differs: {actual_num_records} != {num_records}" 

3045 ) 

3046 

3047 yield self.cs.digest() 

3048 

3049 

3050def write_pack_data( 

3051 write: Union[ 

3052 Callable[[bytes], None], 

3053 Callable[[Union[bytes, bytearray, memoryview]], int], 

3054 IO[bytes], 

3055 ], 

3056 records: Iterator[UnpackedObject], 

3057 *, 

3058 num_records: Optional[int] = None, 

3059 progress: Optional[Callable[..., None]] = None, 

3060 compression_level: int = -1, 

3061) -> tuple[dict[bytes, tuple[int, int]], bytes]: 

3062 """Write a new pack data file. 

3063 

3064 Args: 

3065 write: Write function to use 

3066 num_records: Number of records (defaults to len(records) if None) 

3067 records: Iterator over type_num, object_id, delta_base, raw 

3068 progress: Function to report progress to 

3069 compression_level: the zlib compression level 

3070 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3071 """ 

3072 chunk_generator = PackChunkGenerator( 

3073 num_records=num_records, 

3074 records=records, 

3075 progress=progress, 

3076 compression_level=compression_level, 

3077 ) 

3078 for chunk in chunk_generator: 

3079 if callable(write): 

3080 write(chunk) 

3081 else: 

3082 write.write(chunk) 

3083 return chunk_generator.entries, chunk_generator.sha1digest() 

3084 

3085 

3086def write_pack_index_v1( 

3087 f: IO[bytes], 

3088 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3089 pack_checksum: bytes, 

3090) -> bytes: 

3091 """Write a new pack index file. 

3092 

3093 Args: 

3094 f: A file-like object to write to 

3095 entries: List of tuples with object name (sha), offset_in_pack, 

3096 and crc32_checksum. 

3097 pack_checksum: Checksum of the pack file. 

3098 Returns: The SHA of the written index file 

3099 """ 

3100 f = SHA1Writer(f) 

3101 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3102 for name, _offset, _entry_checksum in entries: 

3103 fan_out_table[ord(name[:1])] += 1 

3104 # Fan-out table 

3105 for i in range(0x100): 

3106 f.write(struct.pack(">L", fan_out_table[i])) 

3107 fan_out_table[i + 1] += fan_out_table[i] 

3108 for name, offset, _entry_checksum in entries: 

3109 if not (offset <= 0xFFFFFFFF): 

3110 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3111 f.write(struct.pack(">L20s", offset, name)) 

3112 assert len(pack_checksum) == 20 

3113 f.write(pack_checksum) 

3114 return f.write_sha() 

3115 

3116 

3117def _delta_encode_size(size: int) -> bytes: 

3118 ret = bytearray() 

3119 c = size & 0x7F 

3120 size >>= 7 

3121 while size: 

3122 ret.append(c | 0x80) 

3123 c = size & 0x7F 

3124 size >>= 7 

3125 ret.append(c) 

3126 return bytes(ret) 

3127 

3128 

3129# The length of delta compression copy operations in version 2 packs is limited 

3130# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3131# 24-bit lengths in copy operations, but we always make version 2 packs. 

3132_MAX_COPY_LEN = 0xFFFF 

3133 

3134 

3135def _encode_copy_operation(start: int, length: int) -> bytes: 

3136 scratch = bytearray([0x80]) 

3137 for i in range(4): 

3138 if start & 0xFF << i * 8: 

3139 scratch.append((start >> i * 8) & 0xFF) 

3140 scratch[0] |= 1 << i 

3141 for i in range(2): 

3142 if length & 0xFF << i * 8: 

3143 scratch.append((length >> i * 8) & 0xFF) 

3144 scratch[0] |= 1 << (4 + i) 

3145 return bytes(scratch) 

3146 

3147 

3148def _create_delta_py(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3149 """Use python difflib to work out how to transform base_buf to target_buf. 

3150 

3151 Args: 

3152 base_buf: Base buffer 

3153 target_buf: Target buffer 

3154 """ 

3155 if isinstance(base_buf, list): 

3156 base_buf = b"".join(base_buf) 

3157 if isinstance(target_buf, list): 

3158 target_buf = b"".join(target_buf) 

3159 assert isinstance(base_buf, bytes) 

3160 assert isinstance(target_buf, bytes) 

3161 # write delta header 

3162 yield _delta_encode_size(len(base_buf)) 

3163 yield _delta_encode_size(len(target_buf)) 

3164 # write out delta opcodes 

3165 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3166 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3167 # Git patch opcodes don't care about deletes! 

3168 # if opcode == 'replace' or opcode == 'delete': 

3169 # pass 

3170 if opcode == "equal": 

3171 # If they are equal, unpacker will use data from base_buf 

3172 # Write out an opcode that says what range to use 

3173 copy_start = i1 

3174 copy_len = i2 - i1 

3175 while copy_len > 0: 

3176 to_copy = min(copy_len, _MAX_COPY_LEN) 

3177 yield _encode_copy_operation(copy_start, to_copy) 

3178 copy_start += to_copy 

3179 copy_len -= to_copy 

3180 if opcode == "replace" or opcode == "insert": 

3181 # If we are replacing a range or adding one, then we just 

3182 # output it to the stream (prefixed by its size) 

3183 s = j2 - j1 

3184 o = j1 

3185 while s > 127: 

3186 yield bytes([127]) 

3187 yield bytes(memoryview(target_buf)[o : o + 127]) 

3188 s -= 127 

3189 o += 127 

3190 yield bytes([s]) 

3191 yield bytes(memoryview(target_buf)[o : o + s]) 

3192 

3193 

3194# Default to pure Python implementation 

3195create_delta = _create_delta_py 

3196 

3197 

3198def apply_delta( 

3199 src_buf: Union[bytes, list[bytes]], delta: Union[bytes, list[bytes]] 

3200) -> list[bytes]: 

3201 """Based on the similar function in git's patch-delta.c. 

3202 

3203 Args: 

3204 src_buf: Source buffer 

3205 delta: Delta instructions 

3206 """ 

3207 if not isinstance(src_buf, bytes): 

3208 src_buf = b"".join(src_buf) 

3209 if not isinstance(delta, bytes): 

3210 delta = b"".join(delta) 

3211 out = [] 

3212 index = 0 

3213 delta_length = len(delta) 

3214 

3215 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3216 size = 0 

3217 i = 0 

3218 while delta: 

3219 cmd = ord(delta[index : index + 1]) 

3220 index += 1 

3221 size |= (cmd & ~0x80) << i 

3222 i += 7 

3223 if not cmd & 0x80: 

3224 break 

3225 return size, index 

3226 

3227 src_size, index = get_delta_header_size(delta, index) 

3228 dest_size, index = get_delta_header_size(delta, index) 

3229 if src_size != len(src_buf): 

3230 raise ApplyDeltaError( 

3231 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3232 ) 

3233 while index < delta_length: 

3234 cmd = ord(delta[index : index + 1]) 

3235 index += 1 

3236 if cmd & 0x80: 

3237 cp_off = 0 

3238 for i in range(4): 

3239 if cmd & (1 << i): 

3240 x = ord(delta[index : index + 1]) 

3241 index += 1 

3242 cp_off |= x << (i * 8) 

3243 cp_size = 0 

3244 # Version 3 packs can contain copy sizes larger than 64K. 

3245 for i in range(3): 

3246 if cmd & (1 << (4 + i)): 

3247 x = ord(delta[index : index + 1]) 

3248 index += 1 

3249 cp_size |= x << (i * 8) 

3250 if cp_size == 0: 

3251 cp_size = 0x10000 

3252 if ( 

3253 cp_off + cp_size < cp_size 

3254 or cp_off + cp_size > src_size 

3255 or cp_size > dest_size 

3256 ): 

3257 break 

3258 out.append(src_buf[cp_off : cp_off + cp_size]) 

3259 elif cmd != 0: 

3260 out.append(delta[index : index + cmd]) 

3261 index += cmd 

3262 else: 

3263 raise ApplyDeltaError("Invalid opcode 0") 

3264 

3265 if index != delta_length: 

3266 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3267 

3268 if dest_size != chunks_length(out): 

3269 raise ApplyDeltaError("dest size incorrect") 

3270 

3271 return out 

3272 

3273 

3274def write_pack_index_v2( 

3275 f: IO[bytes], 

3276 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3277 pack_checksum: bytes, 

3278) -> bytes: 

3279 """Write a new pack index file. 

3280 

3281 Args: 

3282 f: File-like object to write to 

3283 entries: List of tuples with object name (sha), offset_in_pack, and 

3284 crc32_checksum. 

3285 pack_checksum: Checksum of the pack file. 

3286 Returns: The SHA of the index file written 

3287 """ 

3288 f = SHA1Writer(f) 

3289 f.write(b"\377tOc") # Magic! 

3290 f.write(struct.pack(">L", 2)) 

3291 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3292 for name, offset, entry_checksum in entries: 

3293 fan_out_table[ord(name[:1])] += 1 

3294 # Fan-out table 

3295 largetable: list[int] = [] 

3296 for i in range(0x100): 

3297 f.write(struct.pack(b">L", fan_out_table[i])) 

3298 fan_out_table[i + 1] += fan_out_table[i] 

3299 for name, offset, entry_checksum in entries: 

3300 f.write(name) 

3301 for name, offset, entry_checksum in entries: 

3302 f.write(struct.pack(b">L", entry_checksum)) 

3303 for name, offset, entry_checksum in entries: 

3304 if offset < 2**31: 

3305 f.write(struct.pack(b">L", offset)) 

3306 else: 

3307 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3308 largetable.append(offset) 

3309 for offset in largetable: 

3310 f.write(struct.pack(b">Q", offset)) 

3311 assert len(pack_checksum) == 20 

3312 f.write(pack_checksum) 

3313 return f.write_sha() 

3314 

3315 

3316def write_pack_index_v3( 

3317 f: IO[bytes], 

3318 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3319 pack_checksum: bytes, 

3320 hash_algorithm: int = 1, 

3321) -> bytes: 

3322 """Write a new pack index file in v3 format. 

3323 

3324 Args: 

3325 f: File-like object to write to 

3326 entries: List of tuples with object name (sha), offset_in_pack, and 

3327 crc32_checksum. 

3328 pack_checksum: Checksum of the pack file. 

3329 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3330 Returns: The SHA of the index file written 

3331 """ 

3332 if hash_algorithm == 1: 

3333 hash_size = 20 # SHA-1 

3334 writer_cls = SHA1Writer 

3335 elif hash_algorithm == 2: 

3336 hash_size = 32 # SHA-256 

3337 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3338 raise NotImplementedError("SHA-256 support not yet implemented") 

3339 else: 

3340 raise ValueError(f"Unknown hash algorithm {hash_algorithm}") 

3341 

3342 # Convert entries to list to allow multiple iterations 

3343 entries_list = list(entries) 

3344 

3345 # Calculate shortest unambiguous prefix length for object names 

3346 # For now, use full hash size (this could be optimized) 

3347 shortened_oid_len = hash_size 

3348 

3349 f = writer_cls(f) 

3350 f.write(b"\377tOc") # Magic! 

3351 f.write(struct.pack(">L", 3)) # Version 3 

3352 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm 

3353 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3354 

3355 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3356 for name, offset, entry_checksum in entries_list: 

3357 if len(name) != hash_size: 

3358 raise ValueError( 

3359 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3360 ) 

3361 fan_out_table[ord(name[:1])] += 1 

3362 

3363 # Fan-out table 

3364 largetable: list[int] = [] 

3365 for i in range(0x100): 

3366 f.write(struct.pack(b">L", fan_out_table[i])) 

3367 fan_out_table[i + 1] += fan_out_table[i] 

3368 

3369 # Object names table 

3370 for name, offset, entry_checksum in entries_list: 

3371 f.write(name) 

3372 

3373 # CRC32 checksums table 

3374 for name, offset, entry_checksum in entries_list: 

3375 f.write(struct.pack(b">L", entry_checksum)) 

3376 

3377 # Offset table 

3378 for name, offset, entry_checksum in entries_list: 

3379 if offset < 2**31: 

3380 f.write(struct.pack(b">L", offset)) 

3381 else: 

3382 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3383 largetable.append(offset) 

3384 

3385 # Large offset table 

3386 for offset in largetable: 

3387 f.write(struct.pack(b">Q", offset)) 

3388 

3389 assert len(pack_checksum) == hash_size, ( 

3390 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3391 ) 

3392 f.write(pack_checksum) 

3393 return f.write_sha() 

3394 

3395 

3396def write_pack_index( 

3397 f: IO[bytes], 

3398 entries: Iterable[tuple[bytes, int, Union[int, None]]], 

3399 pack_checksum: bytes, 

3400 progress: Optional[Callable[..., None]] = None, 

3401 version: Optional[int] = None, 

3402) -> bytes: 

3403 """Write a pack index file. 

3404 

3405 Args: 

3406 f: File-like object to write to. 

3407 entries: List of (checksum, offset, crc32) tuples 

3408 pack_checksum: Checksum of the pack file. 

3409 progress: Progress function (not currently used) 

3410 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3411 

3412 Returns: 

3413 SHA of the written index file 

3414 """ 

3415 if version is None: 

3416 version = DEFAULT_PACK_INDEX_VERSION 

3417 

3418 if version == 1: 

3419 return write_pack_index_v1(f, entries, pack_checksum) 

3420 elif version == 2: 

3421 return write_pack_index_v2(f, entries, pack_checksum) 

3422 elif version == 3: 

3423 return write_pack_index_v3(f, entries, pack_checksum) 

3424 else: 

3425 raise ValueError(f"Unsupported pack index version: {version}") 

3426 

3427 

3428class Pack: 

3429 """A Git pack object.""" 

3430 

3431 _data_load: Optional[Callable[[], PackData]] 

3432 _idx_load: Optional[Callable[[], PackIndex]] 

3433 

3434 _data: Optional[PackData] 

3435 _idx: Optional[PackIndex] 

3436 _bitmap: Optional["PackBitmap"] 

3437 

3438 def __init__( 

3439 self, 

3440 basename: str, 

3441 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

3442 *, 

3443 delta_window_size: Optional[int] = None, 

3444 window_memory: Optional[int] = None, 

3445 delta_cache_size: Optional[int] = None, 

3446 depth: Optional[int] = None, 

3447 threads: Optional[int] = None, 

3448 big_file_threshold: Optional[int] = None, 

3449 ) -> None: 

3450 """Initialize a Pack object. 

3451 

3452 Args: 

3453 basename: Base path for pack files (without .pack/.idx extension) 

3454 resolve_ext_ref: Optional function to resolve external references 

3455 delta_window_size: Size of the delta compression window 

3456 window_memory: Memory limit for delta compression window 

3457 delta_cache_size: Size of the delta cache 

3458 depth: Maximum depth for delta chains 

3459 threads: Number of threads to use for operations 

3460 big_file_threshold: Size threshold for big file handling 

3461 """ 

3462 self._basename = basename 

3463 self._data = None 

3464 self._idx = None 

3465 self._bitmap = None 

3466 self._idx_path = self._basename + ".idx" 

3467 self._data_path = self._basename + ".pack" 

3468 self._bitmap_path = self._basename + ".bitmap" 

3469 self.delta_window_size = delta_window_size 

3470 self.window_memory = window_memory 

3471 self.delta_cache_size = delta_cache_size 

3472 self.depth = depth 

3473 self.threads = threads 

3474 self.big_file_threshold = big_file_threshold 

3475 self._data_load = lambda: PackData( 

3476 self._data_path, 

3477 delta_window_size=delta_window_size, 

3478 window_memory=window_memory, 

3479 delta_cache_size=delta_cache_size, 

3480 depth=depth, 

3481 threads=threads, 

3482 big_file_threshold=big_file_threshold, 

3483 ) 

3484 self._idx_load = lambda: load_pack_index(self._idx_path) 

3485 self.resolve_ext_ref = resolve_ext_ref 

3486 

3487 @classmethod 

3488 def from_lazy_objects( 

3489 cls, data_fn: Callable[[], PackData], idx_fn: Callable[[], PackIndex] 

3490 ) -> "Pack": 

3491 """Create a new pack object from callables to load pack data and index objects.""" 

3492 ret = cls("") 

3493 ret._data_load = data_fn 

3494 ret._idx_load = idx_fn 

3495 return ret 

3496 

3497 @classmethod 

3498 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

3499 """Create a new pack object from pack data and index objects.""" 

3500 ret = cls("") 

3501 ret._data = data 

3502 ret._data_load = None 

3503 ret._idx = idx 

3504 ret._idx_load = None 

3505 ret.check_length_and_checksum() 

3506 return ret 

3507 

3508 def name(self) -> bytes: 

3509 """The SHA over the SHAs of the objects in this pack.""" 

3510 return self.index.objects_sha1() 

3511 

3512 @property 

3513 def data(self) -> PackData: 

3514 """The pack data object being used.""" 

3515 if self._data is None: 

3516 assert self._data_load 

3517 self._data = self._data_load() 

3518 self.check_length_and_checksum() 

3519 return self._data 

3520 

3521 @property 

3522 def index(self) -> PackIndex: 

3523 """The index being used. 

3524 

3525 Note: This may be an in-memory index 

3526 """ 

3527 if self._idx is None: 

3528 assert self._idx_load 

3529 self._idx = self._idx_load() 

3530 return self._idx 

3531 

3532 @property 

3533 def bitmap(self) -> Optional["PackBitmap"]: 

3534 """The bitmap being used, if available. 

3535 

3536 Returns: 

3537 PackBitmap instance or None if no bitmap exists 

3538 

3539 Raises: 

3540 ValueError: If bitmap file is invalid or corrupt 

3541 """ 

3542 if self._bitmap is None: 

3543 from .bitmap import read_bitmap 

3544 

3545 self._bitmap = read_bitmap(self._bitmap_path, pack_index=self.index) 

3546 return self._bitmap 

3547 

3548 def close(self) -> None: 

3549 """Close the pack file and index.""" 

3550 if self._data is not None: 

3551 self._data.close() 

3552 if self._idx is not None: 

3553 self._idx.close() 

3554 

3555 def __enter__(self) -> "Pack": 

3556 """Enter context manager.""" 

3557 return self 

3558 

3559 def __exit__( 

3560 self, 

3561 exc_type: Optional[type], 

3562 exc_val: Optional[BaseException], 

3563 exc_tb: Optional[TracebackType], 

3564 ) -> None: 

3565 """Exit context manager.""" 

3566 self.close() 

3567 

3568 def __eq__(self, other: object) -> bool: 

3569 """Check equality with another pack.""" 

3570 if not isinstance(other, Pack): 

3571 return False 

3572 return self.index == other.index 

3573 

3574 def __len__(self) -> int: 

3575 """Number of entries in this pack.""" 

3576 return len(self.index) 

3577 

3578 def __repr__(self) -> str: 

3579 """Return string representation of this pack.""" 

3580 return f"{self.__class__.__name__}({self._basename!r})" 

3581 

3582 def __iter__(self) -> Iterator[bytes]: 

3583 """Iterate over all the sha1s of the objects in this pack.""" 

3584 return iter(self.index) 

3585 

3586 def check_length_and_checksum(self) -> None: 

3587 """Sanity check the length and checksum of the pack index and data.""" 

3588 assert len(self.index) == len(self.data), ( 

3589 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

3590 ) 

3591 idx_stored_checksum = self.index.get_pack_checksum() 

3592 data_stored_checksum = self.data.get_stored_checksum() 

3593 if ( 

3594 idx_stored_checksum is not None 

3595 and idx_stored_checksum != data_stored_checksum 

3596 ): 

3597 raise ChecksumMismatch( 

3598 sha_to_hex(idx_stored_checksum), 

3599 sha_to_hex(data_stored_checksum), 

3600 ) 

3601 

3602 def check(self) -> None: 

3603 """Check the integrity of this pack. 

3604 

3605 Raises: 

3606 ChecksumMismatch: if a checksum for the index or data is wrong 

3607 """ 

3608 self.index.check() 

3609 self.data.check() 

3610 for obj in self.iterobjects(): 

3611 obj.check() 

3612 # TODO: object connectivity checks 

3613 

3614 def get_stored_checksum(self) -> bytes: 

3615 """Return the stored checksum of the pack data.""" 

3616 return self.data.get_stored_checksum() 

3617 

3618 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

3619 """Return pack tuples for all objects in pack.""" 

3620 return [(o, None) for o in self.iterobjects()] 

3621 

3622 def __contains__(self, sha1: bytes) -> bool: 

3623 """Check whether this pack contains a particular SHA1.""" 

3624 try: 

3625 self.index.object_offset(sha1) 

3626 return True 

3627 except KeyError: 

3628 return False 

3629 

3630 def get_raw(self, sha1: bytes) -> tuple[int, bytes]: 

3631 """Get raw object data by SHA1.""" 

3632 offset = self.index.object_offset(sha1) 

3633 obj_type, obj = self.data.get_object_at(offset) 

3634 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

3635 return type_num, b"".join(chunks) # type: ignore[arg-type] 

3636 

3637 def __getitem__(self, sha1: bytes) -> ShaFile: 

3638 """Retrieve the specified SHA1.""" 

3639 type, uncomp = self.get_raw(sha1) 

3640 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

3641 

3642 def iterobjects(self) -> Iterator[ShaFile]: 

3643 """Iterate over the objects in this pack.""" 

3644 return iter( 

3645 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

3646 ) 

3647 

3648 def iterobjects_subset( 

3649 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

3650 ) -> Iterator[ShaFile]: 

3651 """Iterate over a subset of objects in this pack.""" 

3652 return ( 

3653 uo 

3654 for uo in PackInflater.for_pack_subset( 

3655 self, 

3656 shas, 

3657 allow_missing=allow_missing, 

3658 resolve_ext_ref=self.resolve_ext_ref, 

3659 ) 

3660 if uo.id in shas 

3661 ) 

3662 

3663 def iter_unpacked_subset( 

3664 self, 

3665 shas: Iterable[ObjectID], 

3666 *, 

3667 include_comp: bool = False, 

3668 allow_missing: bool = False, 

3669 convert_ofs_delta: bool = False, 

3670 ) -> Iterator[UnpackedObject]: 

3671 """Iterate over unpacked objects in subset.""" 

3672 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

3673 ofs: dict[int, bytes] = {} 

3674 todo = set(shas) 

3675 for unpacked in self.iter_unpacked(include_comp=include_comp): 

3676 sha = unpacked.sha() 

3677 if unpacked.offset is not None: 

3678 ofs[unpacked.offset] = sha 

3679 hexsha = sha_to_hex(sha) 

3680 if hexsha in todo: 

3681 if unpacked.pack_type_num == OFS_DELTA: 

3682 assert isinstance(unpacked.delta_base, int) 

3683 assert unpacked.offset is not None 

3684 base_offset = unpacked.offset - unpacked.delta_base 

3685 try: 

3686 unpacked.delta_base = ofs[base_offset] 

3687 except KeyError: 

3688 ofs_pending[base_offset].append(unpacked) 

3689 continue 

3690 else: 

3691 unpacked.pack_type_num = REF_DELTA 

3692 yield unpacked 

3693 todo.remove(hexsha) 

3694 if unpacked.offset is not None: 

3695 for child in ofs_pending.pop(unpacked.offset, []): 

3696 child.pack_type_num = REF_DELTA 

3697 child.delta_base = sha 

3698 yield child 

3699 assert not ofs_pending 

3700 if not allow_missing and todo: 

3701 raise UnresolvedDeltas(list(todo)) 

3702 

3703 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

3704 """Iterate over all unpacked objects in this pack.""" 

3705 ofs_to_entries = { 

3706 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

3707 } 

3708 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

3709 assert unpacked.offset is not None 

3710 (sha, crc32) = ofs_to_entries[unpacked.offset] 

3711 unpacked._sha = sha 

3712 unpacked.crc32 = crc32 

3713 yield unpacked 

3714 

3715 def keep(self, msg: Optional[bytes] = None) -> str: 

3716 """Add a .keep file for the pack, preventing git from garbage collecting it. 

3717 

3718 Args: 

3719 msg: A message written inside the .keep file; can be used later 

3720 to determine whether or not a .keep file is obsolete. 

3721 Returns: The path of the .keep file, as a string. 

3722 """ 

3723 keepfile_name = f"{self._basename}.keep" 

3724 with GitFile(keepfile_name, "wb") as keepfile: 

3725 if msg: 

3726 keepfile.write(msg) 

3727 keepfile.write(b"\n") 

3728 return keepfile_name 

3729 

3730 def get_ref(self, sha: bytes) -> tuple[Optional[int], int, OldUnpackedObject]: 

3731 """Get the object for a ref SHA, only looking in this pack.""" 

3732 # TODO: cache these results 

3733 try: 

3734 offset = self.index.object_offset(sha) 

3735 except KeyError: 

3736 offset = None 

3737 if offset: 

3738 type, obj = self.data.get_object_at(offset) 

3739 elif self.resolve_ext_ref: 

3740 type, obj = self.resolve_ext_ref(sha) 

3741 else: 

3742 raise KeyError(sha) 

3743 return offset, type, obj 

3744 

3745 def resolve_object( 

3746 self, 

3747 offset: int, 

3748 type: int, 

3749 obj: OldUnpackedObject, 

3750 get_ref: Optional[ 

3751 Callable[[bytes], tuple[Optional[int], int, OldUnpackedObject]] 

3752 ] = None, 

3753 ) -> tuple[int, OldUnpackedObject]: 

3754 """Resolve an object, possibly resolving deltas when necessary. 

3755 

3756 Returns: Tuple with object type and contents. 

3757 """ 

3758 # Walk down the delta chain, building a stack of deltas to reach 

3759 # the requested object. 

3760 base_offset = offset 

3761 base_type = type 

3762 base_obj = obj 

3763 delta_stack = [] 

3764 while base_type in DELTA_TYPES: 

3765 prev_offset = base_offset 

3766 if get_ref is None: 

3767 get_ref = self.get_ref 

3768 if base_type == OFS_DELTA: 

3769 (delta_offset, delta) = base_obj 

3770 # TODO: clean up asserts and replace with nicer error messages 

3771 assert isinstance(delta_offset, int), ( 

3772 f"Expected int, got {delta_offset.__class__}" 

3773 ) 

3774 base_offset = base_offset - delta_offset 

3775 base_type, base_obj = self.data.get_object_at(base_offset) 

3776 assert isinstance(base_type, int) 

3777 elif base_type == REF_DELTA: 

3778 (basename, delta) = base_obj 

3779 assert isinstance(basename, bytes) and len(basename) == 20 

3780 base_offset, base_type, base_obj = get_ref(basename) # type: ignore[assignment] 

3781 assert isinstance(base_type, int) 

3782 if base_offset == prev_offset: # object is based on itself 

3783 raise UnresolvedDeltas([basename]) 

3784 delta_stack.append((prev_offset, base_type, delta)) 

3785 

3786 # Now grab the base object (mustn't be a delta) and apply the 

3787 # deltas all the way up the stack. 

3788 chunks = base_obj 

3789 for prev_offset, _delta_type, delta in reversed(delta_stack): 

3790 # Convert chunks to bytes for apply_delta if needed 

3791 if isinstance(chunks, list): 

3792 chunks_bytes = b"".join(chunks) 

3793 elif isinstance(chunks, tuple): 

3794 # For tuple type, second element is the actual data 

3795 _, chunk_data = chunks 

3796 if isinstance(chunk_data, list): 

3797 chunks_bytes = b"".join(chunk_data) 

3798 else: 

3799 chunks_bytes = chunk_data 

3800 else: 

3801 chunks_bytes = chunks 

3802 

3803 # Apply delta and get result as list 

3804 chunks = apply_delta(chunks_bytes, delta) 

3805 

3806 if prev_offset is not None: 

3807 self.data._offset_cache[prev_offset] = base_type, chunks 

3808 return base_type, chunks 

3809 

3810 def entries( 

3811 self, progress: Optional[Callable[[int, int], None]] = None 

3812 ) -> Iterator[PackIndexEntry]: 

3813 """Yield entries summarizing the contents of this pack. 

3814 

3815 Args: 

3816 progress: Progress function, called with current and total 

3817 object count. 

3818 Returns: iterator of tuples with (sha, offset, crc32) 

3819 """ 

3820 return self.data.iterentries( 

3821 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3822 ) 

3823 

3824 def sorted_entries( 

3825 self, progress: Optional[ProgressFn] = None 

3826 ) -> Iterator[PackIndexEntry]: 

3827 """Return entries in this pack, sorted by SHA. 

3828 

3829 Args: 

3830 progress: Progress function, called with current and total 

3831 object count 

3832 Returns: Iterator of tuples with (sha, offset, crc32) 

3833 """ 

3834 return iter( 

3835 self.data.sorted_entries( 

3836 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3837 ) 

3838 ) 

3839 

3840 def get_unpacked_object( 

3841 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True 

3842 ) -> UnpackedObject: 

3843 """Get the unpacked object for a sha. 

3844 

3845 Args: 

3846 sha: SHA of object to fetch 

3847 include_comp: Whether to include compression data in UnpackedObject 

3848 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

3849 """ 

3850 offset = self.index.object_offset(sha) 

3851 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

3852 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

3853 assert isinstance(unpacked.delta_base, int) 

3854 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

3855 unpacked.pack_type_num = REF_DELTA 

3856 return unpacked 

3857 

3858 

3859def extend_pack( 

3860 f: BinaryIO, 

3861 object_ids: Set[ObjectID], 

3862 get_raw: Callable[[ObjectID], tuple[int, bytes]], 

3863 *, 

3864 compression_level: int = -1, 

3865 progress: Optional[Callable[[bytes], None]] = None, 

3866) -> tuple[bytes, list[tuple[bytes, int, int]]]: 

3867 """Extend a pack file with more objects. 

3868 

3869 The caller should make sure that object_ids does not contain any objects 

3870 that are already in the pack 

3871 """ 

3872 # Update the header with the new number of objects. 

3873 f.seek(0) 

3874 _version, num_objects = read_pack_header(f.read) 

3875 

3876 if object_ids: 

3877 f.seek(0) 

3878 write_pack_header(f.write, num_objects + len(object_ids)) 

3879 

3880 # Must flush before reading (http://bugs.python.org/issue3207) 

3881 f.flush() 

3882 

3883 # Rescan the rest of the pack, computing the SHA with the new header. 

3884 new_sha = compute_file_sha(f, end_ofs=-20) 

3885 

3886 # Must reposition before writing (http://bugs.python.org/issue3207) 

3887 f.seek(0, os.SEEK_CUR) 

3888 

3889 extra_entries = [] 

3890 

3891 # Complete the pack. 

3892 for i, object_id in enumerate(object_ids): 

3893 if progress is not None: 

3894 progress( 

3895 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

3896 ) 

3897 assert len(object_id) == 20 

3898 type_num, data = get_raw(object_id) 

3899 offset = f.tell() 

3900 crc32 = write_pack_object( 

3901 f.write, 

3902 type_num, 

3903 [data], # Convert bytes to list[bytes] 

3904 sha=new_sha, 

3905 compression_level=compression_level, 

3906 ) 

3907 extra_entries.append((object_id, offset, crc32)) 

3908 pack_sha = new_sha.digest() 

3909 f.write(pack_sha) 

3910 return pack_sha, extra_entries 

3911 

3912 

3913try: 

3914 from dulwich._pack import ( # type: ignore 

3915 apply_delta, 

3916 bisect_find_sha, 

3917 ) 

3918except ImportError: 

3919 pass 

3920 

3921# Try to import the Rust version of create_delta 

3922try: 

3923 from dulwich._pack import create_delta as _create_delta_rs 

3924except ImportError: 

3925 pass 

3926else: 

3927 # Wrap the Rust version to match the Python API (returns bytes instead of Iterator) 

3928 def _create_delta_rs_wrapper(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3929 """Wrapper for Rust create_delta to match Python API.""" 

3930 yield _create_delta_rs(base_buf, target_buf) 

3931 

3932 create_delta = _create_delta_rs_wrapper