Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1607 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36import binascii 

37from collections import defaultdict, deque 

38from contextlib import suppress 

39from io import BytesIO, UnsupportedOperation 

40 

41try: 

42 from cdifflib import CSequenceMatcher as SequenceMatcher 

43except ModuleNotFoundError: 

44 from difflib import SequenceMatcher 

45 

46import os 

47import struct 

48import sys 

49import warnings 

50import zlib 

51from collections.abc import Iterable, Iterator, Sequence 

52from hashlib import sha1 

53from itertools import chain 

54from os import SEEK_CUR, SEEK_END 

55from struct import unpack_from 

56from types import TracebackType 

57from typing import ( 

58 IO, 

59 TYPE_CHECKING, 

60 Any, 

61 BinaryIO, 

62 Callable, 

63 Generic, 

64 Optional, 

65 Protocol, 

66 TypeVar, 

67 Union, 

68 cast, 

69) 

70 

71try: 

72 import mmap 

73except ImportError: 

74 has_mmap = False 

75else: 

76 has_mmap = True 

77 

78if TYPE_CHECKING: 

79 from _hashlib import HASH as HashObject 

80 

81 from .commit_graph import CommitGraph 

82 

83# For some reason the above try, except fails to set has_mmap = False for plan9 

84if sys.platform == "Plan9": 

85 has_mmap = False 

86 

87from . import replace_me 

88from .errors import ApplyDeltaError, ChecksumMismatch 

89from .file import GitFile, _GitFile 

90from .lru_cache import LRUSizeCache 

91from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex 

92 

93OFS_DELTA = 6 

94REF_DELTA = 7 

95 

96DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

97 

98 

99DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

100 

101# Keep pack files under 16Mb in memory, otherwise write them out to disk 

102PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

103 

104# Default pack index version to use when none is specified 

105DEFAULT_PACK_INDEX_VERSION = 2 

106 

107 

108OldUnpackedObject = Union[tuple[Union[bytes, int], list[bytes]], list[bytes]] 

109ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

110ProgressFn = Callable[[int, str], None] 

111PackHint = tuple[int, Optional[bytes]] 

112 

113 

114class UnresolvedDeltas(Exception): 

115 """Delta objects could not be resolved.""" 

116 

117 def __init__(self, shas: list[bytes]) -> None: 

118 """Initialize UnresolvedDeltas exception. 

119 

120 Args: 

121 shas: List of SHA hashes for unresolved delta objects 

122 """ 

123 self.shas = shas 

124 

125 

126class ObjectContainer(Protocol): 

127 """Protocol for objects that can contain git objects.""" 

128 

129 def add_object(self, obj: ShaFile) -> None: 

130 """Add a single object to this object store.""" 

131 

132 def add_objects( 

133 self, 

134 objects: Sequence[tuple[ShaFile, Optional[str]]], 

135 progress: Optional[Callable[[str], None]] = None, 

136 ) -> Optional["Pack"]: 

137 """Add a set of objects to this object store. 

138 

139 Args: 

140 objects: Iterable over a list of (object, path) tuples 

141 progress: Progress callback for object insertion 

142 Returns: Optional Pack object of the objects written. 

143 """ 

144 

145 def __contains__(self, sha1: bytes) -> bool: 

146 """Check if a hex sha is present.""" 

147 

148 def __getitem__(self, sha1: bytes) -> ShaFile: 

149 """Retrieve an object.""" 

150 

151 def get_commit_graph(self) -> Optional["CommitGraph"]: 

152 """Get the commit graph for this object store. 

153 

154 Returns: 

155 CommitGraph object if available, None otherwise 

156 """ 

157 return None 

158 

159 

160class PackedObjectContainer(ObjectContainer): 

161 """Container for objects packed in a pack file.""" 

162 

163 def get_unpacked_object( 

164 self, sha1: bytes, *, include_comp: bool = False 

165 ) -> "UnpackedObject": 

166 """Get a raw unresolved object. 

167 

168 Args: 

169 sha1: SHA-1 hash of the object 

170 include_comp: Whether to include compressed data 

171 

172 Returns: 

173 UnpackedObject instance 

174 """ 

175 raise NotImplementedError(self.get_unpacked_object) 

176 

177 def iterobjects_subset( 

178 self, shas: Iterable[bytes], *, allow_missing: bool = False 

179 ) -> Iterator[ShaFile]: 

180 """Iterate over a subset of objects. 

181 

182 Args: 

183 shas: Iterable of object SHAs to retrieve 

184 allow_missing: If True, skip missing objects 

185 

186 Returns: 

187 Iterator of ShaFile objects 

188 """ 

189 raise NotImplementedError(self.iterobjects_subset) 

190 

191 def iter_unpacked_subset( 

192 self, 

193 shas: set[bytes], 

194 include_comp: bool = False, 

195 allow_missing: bool = False, 

196 convert_ofs_delta: bool = True, 

197 ) -> Iterator["UnpackedObject"]: 

198 """Iterate over unpacked objects from a subset of SHAs. 

199 

200 Args: 

201 shas: Set of object SHAs to retrieve 

202 include_comp: Include compressed data if True 

203 allow_missing: If True, skip missing objects 

204 convert_ofs_delta: If True, convert offset deltas to ref deltas 

205 

206 Returns: 

207 Iterator of UnpackedObject instances 

208 """ 

209 raise NotImplementedError(self.iter_unpacked_subset) 

210 

211 

212class UnpackedObjectStream: 

213 """Abstract base class for a stream of unpacked objects.""" 

214 

215 def __iter__(self) -> Iterator["UnpackedObject"]: 

216 """Iterate over unpacked objects.""" 

217 raise NotImplementedError(self.__iter__) 

218 

219 def __len__(self) -> int: 

220 """Return the number of objects in the stream.""" 

221 raise NotImplementedError(self.__len__) 

222 

223 

224def take_msb_bytes( 

225 read: Callable[[int], bytes], crc32: Optional[int] = None 

226) -> tuple[list[int], Optional[int]]: 

227 """Read bytes marked with most significant bit. 

228 

229 Args: 

230 read: Read function 

231 crc32: Optional CRC32 checksum to update 

232 

233 Returns: 

234 Tuple of (list of bytes read, updated CRC32 or None) 

235 """ 

236 ret: list[int] = [] 

237 while len(ret) == 0 or ret[-1] & 0x80: 

238 b = read(1) 

239 if crc32 is not None: 

240 crc32 = binascii.crc32(b, crc32) 

241 ret.append(ord(b[:1])) 

242 return ret, crc32 

243 

244 

245class PackFileDisappeared(Exception): 

246 """Raised when a pack file unexpectedly disappears.""" 

247 

248 def __init__(self, obj: object) -> None: 

249 """Initialize PackFileDisappeared exception. 

250 

251 Args: 

252 obj: The object that triggered the exception 

253 """ 

254 self.obj = obj 

255 

256 

257class UnpackedObject: 

258 """Class encapsulating an object unpacked from a pack file. 

259 

260 These objects should only be created from within unpack_object. Most 

261 members start out as empty and are filled in at various points by 

262 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

263 

264 End users of this object should take care that the function they're getting 

265 this object from is guaranteed to set the members they need. 

266 """ 

267 

268 __slots__ = [ 

269 "_sha", # Cached binary SHA. 

270 "comp_chunks", # Compressed object chunks. 

271 "crc32", # CRC32. 

272 "decomp_chunks", # Decompressed object chunks. 

273 "decomp_len", # Decompressed length of this object. 

274 "delta_base", # Delta base offset or SHA. 

275 "obj_chunks", # Decompressed and delta-resolved chunks. 

276 "obj_type_num", # Type of this object. 

277 "offset", # Offset in its pack. 

278 "pack_type_num", # Type of this object in the pack (may be a delta). 

279 ] 

280 

281 obj_type_num: Optional[int] 

282 obj_chunks: Optional[list[bytes]] 

283 delta_base: Union[None, bytes, int] 

284 decomp_chunks: list[bytes] 

285 comp_chunks: Optional[list[bytes]] 

286 decomp_len: Optional[int] 

287 crc32: Optional[int] 

288 offset: Optional[int] 

289 pack_type_num: int 

290 _sha: Optional[bytes] 

291 

292 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

293 # methods of this object. 

294 def __init__( 

295 self, 

296 pack_type_num: int, 

297 *, 

298 delta_base: Union[None, bytes, int] = None, 

299 decomp_len: Optional[int] = None, 

300 crc32: Optional[int] = None, 

301 sha: Optional[bytes] = None, 

302 decomp_chunks: Optional[list[bytes]] = None, 

303 offset: Optional[int] = None, 

304 ) -> None: 

305 """Initialize an UnpackedObject. 

306 

307 Args: 

308 pack_type_num: Type number of this object in the pack 

309 delta_base: Delta base (offset or SHA) if this is a delta object 

310 decomp_len: Decompressed length of this object 

311 crc32: CRC32 checksum 

312 sha: SHA-1 hash of the object 

313 decomp_chunks: Decompressed chunks 

314 offset: Offset in the pack file 

315 """ 

316 self.offset = offset 

317 self._sha = sha 

318 self.pack_type_num = pack_type_num 

319 self.delta_base = delta_base 

320 self.comp_chunks = None 

321 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

322 if decomp_chunks is not None and decomp_len is None: 

323 self.decomp_len = sum(map(len, decomp_chunks)) 

324 else: 

325 self.decomp_len = decomp_len 

326 self.crc32 = crc32 

327 

328 if pack_type_num in DELTA_TYPES: 

329 self.obj_type_num = None 

330 self.obj_chunks = None 

331 else: 

332 self.obj_type_num = pack_type_num 

333 self.obj_chunks = self.decomp_chunks 

334 self.delta_base = delta_base 

335 

336 def sha(self) -> bytes: 

337 """Return the binary SHA of this object.""" 

338 if self._sha is None: 

339 assert self.obj_type_num is not None and self.obj_chunks is not None 

340 self._sha = obj_sha(self.obj_type_num, self.obj_chunks) 

341 return self._sha 

342 

343 def sha_file(self) -> ShaFile: 

344 """Return a ShaFile from this object.""" 

345 assert self.obj_type_num is not None and self.obj_chunks is not None 

346 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

347 

348 # Only provided for backwards compatibility with code that expects either 

349 # chunks or a delta tuple. 

350 def _obj(self) -> OldUnpackedObject: 

351 """Return the decompressed chunks, or (delta base, delta chunks).""" 

352 if self.pack_type_num in DELTA_TYPES: 

353 assert isinstance(self.delta_base, (bytes, int)) 

354 return (self.delta_base, self.decomp_chunks) 

355 else: 

356 return self.decomp_chunks 

357 

358 def __eq__(self, other: object) -> bool: 

359 """Check equality with another UnpackedObject.""" 

360 if not isinstance(other, UnpackedObject): 

361 return False 

362 for slot in self.__slots__: 

363 if getattr(self, slot) != getattr(other, slot): 

364 return False 

365 return True 

366 

367 def __ne__(self, other: object) -> bool: 

368 """Check inequality with another UnpackedObject.""" 

369 return not (self == other) 

370 

371 def __repr__(self) -> str: 

372 """Return string representation of this UnpackedObject.""" 

373 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

374 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

375 

376 

377_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

378 

379 

380def read_zlib_chunks( 

381 read_some: Callable[[int], bytes], 

382 unpacked: UnpackedObject, 

383 include_comp: bool = False, 

384 buffer_size: int = _ZLIB_BUFSIZE, 

385) -> bytes: 

386 """Read zlib data from a buffer. 

387 

388 This function requires that the buffer have additional data following the 

389 compressed data, which is guaranteed to be the case for git pack files. 

390 

391 Args: 

392 read_some: Read function that returns at least one byte, but may 

393 return less than the requested size. 

394 unpacked: An UnpackedObject to write result data to. If its crc32 

395 attr is not None, the CRC32 of the compressed bytes will be computed 

396 using this starting CRC32. 

397 After this function, will have the following attrs set: 

398 * comp_chunks (if include_comp is True) 

399 * decomp_chunks 

400 * decomp_len 

401 * crc32 

402 include_comp: If True, include compressed data in the result. 

403 buffer_size: Size of the read buffer. 

404 Returns: Leftover unused data from the decompression. 

405 

406 Raises: 

407 zlib.error: if a decompression error occurred. 

408 """ 

409 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

410 raise ValueError("non-negative zlib data stream size expected") 

411 decomp_obj = zlib.decompressobj() 

412 

413 comp_chunks = [] 

414 decomp_chunks = unpacked.decomp_chunks 

415 decomp_len = 0 

416 crc32 = unpacked.crc32 

417 

418 while True: 

419 add = read_some(buffer_size) 

420 if not add: 

421 raise zlib.error("EOF before end of zlib stream") 

422 comp_chunks.append(add) 

423 decomp = decomp_obj.decompress(add) 

424 decomp_len += len(decomp) 

425 decomp_chunks.append(decomp) 

426 unused = decomp_obj.unused_data 

427 if unused: 

428 left = len(unused) 

429 if crc32 is not None: 

430 crc32 = binascii.crc32(add[:-left], crc32) 

431 if include_comp: 

432 comp_chunks[-1] = add[:-left] 

433 break 

434 elif crc32 is not None: 

435 crc32 = binascii.crc32(add, crc32) 

436 if crc32 is not None: 

437 crc32 &= 0xFFFFFFFF 

438 

439 if decomp_len != unpacked.decomp_len: 

440 raise zlib.error("decompressed data does not match expected size") 

441 

442 unpacked.crc32 = crc32 

443 if include_comp: 

444 unpacked.comp_chunks = comp_chunks 

445 return unused 

446 

447 

448def iter_sha1(iter: Iterable[bytes]) -> bytes: 

449 """Return the hexdigest of the SHA1 over a set of names. 

450 

451 Args: 

452 iter: Iterator over string objects 

453 Returns: 40-byte hex sha1 digest 

454 """ 

455 sha = sha1() 

456 for name in iter: 

457 sha.update(name) 

458 return sha.hexdigest().encode("ascii") 

459 

460 

461def load_pack_index(path: Union[str, os.PathLike]) -> "PackIndex": 

462 """Load an index file by path. 

463 

464 Args: 

465 path: Path to the index file 

466 Returns: A PackIndex loaded from the given path 

467 """ 

468 with GitFile(path, "rb") as f: 

469 return load_pack_index_file(path, f) 

470 

471 

472def _load_file_contents( 

473 f: Union[IO[bytes], _GitFile], size: Optional[int] = None 

474) -> tuple[Union[bytes, Any], int]: 

475 """Load contents from a file, preferring mmap when possible. 

476 

477 Args: 

478 f: File-like object to load 

479 size: Expected size, or None to determine from file 

480 Returns: Tuple of (contents, size) 

481 """ 

482 try: 

483 fd = f.fileno() 

484 except (UnsupportedOperation, AttributeError): 

485 fd = None 

486 # Attempt to use mmap if possible 

487 if fd is not None: 

488 if size is None: 

489 size = os.fstat(fd).st_size 

490 if has_mmap: 

491 try: 

492 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

493 except (OSError, ValueError): 

494 # Can't mmap - perhaps a socket or invalid file descriptor 

495 pass 

496 else: 

497 return contents, size 

498 contents_bytes = f.read() 

499 size = len(contents_bytes) 

500 return contents_bytes, size 

501 

502 

503def load_pack_index_file( 

504 path: Union[str, os.PathLike], f: Union[IO[bytes], _GitFile] 

505) -> "PackIndex": 

506 """Load an index file from a file-like object. 

507 

508 Args: 

509 path: Path for the index file 

510 f: File-like object 

511 Returns: A PackIndex loaded from the given file 

512 """ 

513 contents, size = _load_file_contents(f) 

514 if contents[:4] == b"\377tOc": 

515 version = struct.unpack(b">L", contents[4:8])[0] 

516 if version == 2: 

517 return PackIndex2(path, file=f, contents=contents, size=size) 

518 elif version == 3: 

519 return PackIndex3(path, file=f, contents=contents, size=size) 

520 else: 

521 raise KeyError(f"Unknown pack index format {version}") 

522 else: 

523 return PackIndex1(path, file=f, contents=contents, size=size) 

524 

525 

526def bisect_find_sha( 

527 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

528) -> Optional[int]: 

529 """Find a SHA in a data blob with sorted SHAs. 

530 

531 Args: 

532 start: Start index of range to search 

533 end: End index of range to search 

534 sha: Sha to find 

535 unpack_name: Callback to retrieve SHA by index 

536 Returns: Index of the SHA, or None if it wasn't found 

537 """ 

538 assert start <= end 

539 while start <= end: 

540 i = (start + end) // 2 

541 file_sha = unpack_name(i) 

542 if file_sha < sha: 

543 start = i + 1 

544 elif file_sha > sha: 

545 end = i - 1 

546 else: 

547 return i 

548 return None 

549 

550 

551PackIndexEntry = tuple[bytes, int, Optional[int]] 

552 

553 

554class PackIndex: 

555 """An index in to a packfile. 

556 

557 Given a sha id of an object a pack index can tell you the location in the 

558 packfile of that object if it has it. 

559 """ 

560 

561 # Default to SHA-1 for backward compatibility 

562 hash_algorithm = 1 

563 hash_size = 20 

564 

565 def __eq__(self, other: object) -> bool: 

566 """Check equality with another PackIndex.""" 

567 if not isinstance(other, PackIndex): 

568 return False 

569 

570 for (name1, _, _), (name2, _, _) in zip( 

571 self.iterentries(), other.iterentries() 

572 ): 

573 if name1 != name2: 

574 return False 

575 return True 

576 

577 def __ne__(self, other: object) -> bool: 

578 """Check if this pack index is not equal to another.""" 

579 return not self.__eq__(other) 

580 

581 def __len__(self) -> int: 

582 """Return the number of entries in this pack index.""" 

583 raise NotImplementedError(self.__len__) 

584 

585 def __iter__(self) -> Iterator[bytes]: 

586 """Iterate over the SHAs in this pack.""" 

587 return map(sha_to_hex, self._itersha()) 

588 

589 def iterentries(self) -> Iterator[PackIndexEntry]: 

590 """Iterate over the entries in this pack index. 

591 

592 Returns: iterator over tuples with object name, offset in packfile and 

593 crc32 checksum. 

594 """ 

595 raise NotImplementedError(self.iterentries) 

596 

597 def get_pack_checksum(self) -> Optional[bytes]: 

598 """Return the SHA1 checksum stored for the corresponding packfile. 

599 

600 Returns: 20-byte binary digest, or None if not available 

601 """ 

602 raise NotImplementedError(self.get_pack_checksum) 

603 

604 @replace_me(since="0.21.0", remove_in="0.23.0") 

605 def object_index(self, sha: bytes) -> int: 

606 """Return the index for the given SHA. 

607 

608 Args: 

609 sha: SHA-1 hash 

610 

611 Returns: 

612 Index position 

613 """ 

614 return self.object_offset(sha) 

615 

616 def object_offset(self, sha: bytes) -> int: 

617 """Return the offset in to the corresponding packfile for the object. 

618 

619 Given the name of an object it will return the offset that object 

620 lives at within the corresponding pack file. If the pack file doesn't 

621 have the object then None will be returned. 

622 """ 

623 raise NotImplementedError(self.object_offset) 

624 

625 def object_sha1(self, index: int) -> bytes: 

626 """Return the SHA1 corresponding to the index in the pack file.""" 

627 for name, offset, _crc32 in self.iterentries(): 

628 if offset == index: 

629 return name 

630 else: 

631 raise KeyError(index) 

632 

633 def _object_offset(self, sha: bytes) -> int: 

634 """See object_offset. 

635 

636 Args: 

637 sha: A *binary* SHA string. (20 characters long)_ 

638 """ 

639 raise NotImplementedError(self._object_offset) 

640 

641 def objects_sha1(self) -> bytes: 

642 """Return the hex SHA1 over all the shas of all objects in this pack. 

643 

644 Note: This is used for the filename of the pack. 

645 """ 

646 return iter_sha1(self._itersha()) 

647 

648 def _itersha(self) -> Iterator[bytes]: 

649 """Yield all the SHA1's of the objects in the index, sorted.""" 

650 raise NotImplementedError(self._itersha) 

651 

652 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

653 """Iterate over all SHA1s with the given prefix. 

654 

655 Args: 

656 prefix: Binary prefix to match 

657 Returns: Iterator of matching SHA1s 

658 """ 

659 # Default implementation for PackIndex classes that don't override 

660 for sha, _, _ in self.iterentries(): 

661 if sha.startswith(prefix): 

662 yield sha 

663 

664 def close(self) -> None: 

665 """Close any open files.""" 

666 

667 def check(self) -> None: 

668 """Check the consistency of this pack index.""" 

669 

670 

671class MemoryPackIndex(PackIndex): 

672 """Pack index that is stored entirely in memory.""" 

673 

674 def __init__( 

675 self, 

676 entries: list[tuple[bytes, int, Optional[int]]], 

677 pack_checksum: Optional[bytes] = None, 

678 ) -> None: 

679 """Create a new MemoryPackIndex. 

680 

681 Args: 

682 entries: Sequence of name, idx, crc32 (sorted) 

683 pack_checksum: Optional pack checksum 

684 """ 

685 self._by_sha = {} 

686 self._by_offset = {} 

687 for name, offset, _crc32 in entries: 

688 self._by_sha[name] = offset 

689 self._by_offset[offset] = name 

690 self._entries = entries 

691 self._pack_checksum = pack_checksum 

692 

693 def get_pack_checksum(self) -> Optional[bytes]: 

694 """Return the SHA checksum stored for the corresponding packfile.""" 

695 return self._pack_checksum 

696 

697 def __len__(self) -> int: 

698 """Return the number of entries in this pack index.""" 

699 return len(self._entries) 

700 

701 def object_offset(self, sha: bytes) -> int: 

702 """Return the offset for the given SHA. 

703 

704 Args: 

705 sha: SHA to look up (binary or hex) 

706 Returns: Offset in the pack file 

707 """ 

708 if len(sha) == 40: 

709 sha = hex_to_sha(sha) 

710 return self._by_sha[sha] 

711 

712 def object_sha1(self, offset: int) -> bytes: 

713 """Return the SHA1 for the object at the given offset.""" 

714 return self._by_offset[offset] 

715 

716 def _itersha(self) -> Iterator[bytes]: 

717 """Iterate over all SHA1s in the index.""" 

718 return iter(self._by_sha) 

719 

720 def iterentries(self) -> Iterator[PackIndexEntry]: 

721 """Iterate over all index entries.""" 

722 return iter(self._entries) 

723 

724 @classmethod 

725 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

726 """Create a MemoryPackIndex from a PackData object.""" 

727 return MemoryPackIndex( 

728 list(pack_data.sorted_entries()), pack_data.get_stored_checksum() 

729 ) 

730 

731 @classmethod 

732 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

733 """Create a copy of another PackIndex in memory.""" 

734 return cls(list(other_index.iterentries()), other_index.get_pack_checksum()) 

735 

736 

737class FilePackIndex(PackIndex): 

738 """Pack index that is based on a file. 

739 

740 To do the loop it opens the file, and indexes first 256 4 byte groups 

741 with the first byte of the sha id. The value in the four byte group indexed 

742 is the end of the group that shares the same starting byte. Subtract one 

743 from the starting byte and index again to find the start of the group. 

744 The values are sorted by sha id within the group, so do the math to find 

745 the start and end offset and then bisect in to find if the value is 

746 present. 

747 """ 

748 

749 _fan_out_table: list[int] 

750 _file: Union[IO[bytes], _GitFile] 

751 

752 def __init__( 

753 self, 

754 filename: Union[str, os.PathLike], 

755 file: Optional[Union[IO[bytes], _GitFile]] = None, 

756 contents: Optional[Union[bytes, "mmap.mmap"]] = None, 

757 size: Optional[int] = None, 

758 ) -> None: 

759 """Create a pack index object. 

760 

761 Provide it with the name of the index file to consider, and it will map 

762 it whenever required. 

763 """ 

764 self._filename = filename 

765 # Take the size now, so it can be checked each time we map the file to 

766 # ensure that it hasn't changed. 

767 if file is None: 

768 self._file = GitFile(filename, "rb") 

769 else: 

770 self._file = file 

771 if contents is None: 

772 self._contents, self._size = _load_file_contents(self._file, size) 

773 else: 

774 self._contents = contents 

775 self._size = size if size is not None else len(contents) 

776 

777 @property 

778 def path(self) -> str: 

779 """Return the path to this index file.""" 

780 return os.fspath(self._filename) 

781 

782 def __eq__(self, other: object) -> bool: 

783 """Check equality with another FilePackIndex.""" 

784 # Quick optimization: 

785 if ( 

786 isinstance(other, FilePackIndex) 

787 and self._fan_out_table != other._fan_out_table 

788 ): 

789 return False 

790 

791 return super().__eq__(other) 

792 

793 def close(self) -> None: 

794 """Close the underlying file and any mmap.""" 

795 self._file.close() 

796 close_fn = getattr(self._contents, "close", None) 

797 if close_fn is not None: 

798 close_fn() 

799 

800 def __len__(self) -> int: 

801 """Return the number of entries in this pack index.""" 

802 return self._fan_out_table[-1] 

803 

804 def _unpack_entry(self, i: int) -> PackIndexEntry: 

805 """Unpack the i-th entry in the index file. 

806 

807 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

808 checksum (if known). 

809 """ 

810 raise NotImplementedError(self._unpack_entry) 

811 

812 def _unpack_name(self, i: int) -> bytes: 

813 """Unpack the i-th name from the index file.""" 

814 raise NotImplementedError(self._unpack_name) 

815 

816 def _unpack_offset(self, i: int) -> int: 

817 """Unpack the i-th object offset from the index file.""" 

818 raise NotImplementedError(self._unpack_offset) 

819 

820 def _unpack_crc32_checksum(self, i: int) -> Optional[int]: 

821 """Unpack the crc32 checksum for the ith object from the index file.""" 

822 raise NotImplementedError(self._unpack_crc32_checksum) 

823 

824 def _itersha(self) -> Iterator[bytes]: 

825 """Iterate over all SHA1s in the index.""" 

826 for i in range(len(self)): 

827 yield self._unpack_name(i) 

828 

829 def iterentries(self) -> Iterator[PackIndexEntry]: 

830 """Iterate over the entries in this pack index. 

831 

832 Returns: iterator over tuples with object name, offset in packfile and 

833 crc32 checksum. 

834 """ 

835 for i in range(len(self)): 

836 yield self._unpack_entry(i) 

837 

838 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

839 """Read the fan-out table from the index. 

840 

841 The fan-out table contains 256 entries mapping first byte values 

842 to the number of objects with SHA1s less than or equal to that byte. 

843 

844 Args: 

845 start_offset: Offset in the file where the fan-out table starts 

846 Returns: List of 256 integers 

847 """ 

848 ret = [] 

849 for i in range(0x100): 

850 fanout_entry = self._contents[ 

851 start_offset + i * 4 : start_offset + (i + 1) * 4 

852 ] 

853 ret.append(struct.unpack(">L", fanout_entry)[0]) 

854 return ret 

855 

856 def check(self) -> None: 

857 """Check that the stored checksum matches the actual checksum.""" 

858 actual = self.calculate_checksum() 

859 stored = self.get_stored_checksum() 

860 if actual != stored: 

861 raise ChecksumMismatch(stored, actual) 

862 

863 def calculate_checksum(self) -> bytes: 

864 """Calculate the SHA1 checksum over this pack index. 

865 

866 Returns: This is a 20-byte binary digest 

867 """ 

868 return sha1(self._contents[:-20]).digest() 

869 

870 def get_pack_checksum(self) -> bytes: 

871 """Return the SHA1 checksum stored for the corresponding packfile. 

872 

873 Returns: 20-byte binary digest 

874 """ 

875 return bytes(self._contents[-40:-20]) 

876 

877 def get_stored_checksum(self) -> bytes: 

878 """Return the SHA1 checksum stored for this index. 

879 

880 Returns: 20-byte binary digest 

881 """ 

882 return bytes(self._contents[-20:]) 

883 

884 def object_offset(self, sha: bytes) -> int: 

885 """Return the offset in to the corresponding packfile for the object. 

886 

887 Given the name of an object it will return the offset that object 

888 lives at within the corresponding pack file. If the pack file doesn't 

889 have the object then None will be returned. 

890 """ 

891 if len(sha) == 40: 

892 sha = hex_to_sha(sha) 

893 try: 

894 return self._object_offset(sha) 

895 except ValueError as exc: 

896 closed = getattr(self._contents, "closed", None) 

897 if closed in (None, True): 

898 raise PackFileDisappeared(self) from exc 

899 raise 

900 

901 def _object_offset(self, sha: bytes) -> int: 

902 """See object_offset. 

903 

904 Args: 

905 sha: A *binary* SHA string. (20 characters long)_ 

906 """ 

907 assert len(sha) == 20 

908 idx = ord(sha[:1]) 

909 if idx == 0: 

910 start = 0 

911 else: 

912 start = self._fan_out_table[idx - 1] 

913 end = self._fan_out_table[idx] 

914 i = bisect_find_sha(start, end, sha, self._unpack_name) 

915 if i is None: 

916 raise KeyError(sha) 

917 return self._unpack_offset(i) 

918 

919 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

920 """Iterate over all SHA1s with the given prefix.""" 

921 start = ord(prefix[:1]) 

922 if start == 0: 

923 start = 0 

924 else: 

925 start = self._fan_out_table[start - 1] 

926 end = ord(prefix[:1]) + 1 

927 if end == 0x100: 

928 end = len(self) 

929 else: 

930 end = self._fan_out_table[end] 

931 assert start <= end 

932 started = False 

933 for i in range(start, end): 

934 name: bytes = self._unpack_name(i) 

935 if name.startswith(prefix): 

936 yield name 

937 started = True 

938 elif started: 

939 break 

940 

941 

942class PackIndex1(FilePackIndex): 

943 """Version 1 Pack Index file.""" 

944 

945 def __init__( 

946 self, 

947 filename: Union[str, os.PathLike], 

948 file: Optional[Union[IO[bytes], _GitFile]] = None, 

949 contents: Optional[bytes] = None, 

950 size: Optional[int] = None, 

951 ) -> None: 

952 """Initialize a version 1 pack index. 

953 

954 Args: 

955 filename: Path to the index file 

956 file: Optional file object 

957 contents: Optional mmap'd contents 

958 size: Optional size of the index 

959 """ 

960 super().__init__(filename, file, contents, size) 

961 self.version = 1 

962 self._fan_out_table = self._read_fan_out_table(0) 

963 

964 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]: 

965 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24)) 

966 return (name, offset, None) 

967 

968 def _unpack_name(self, i: int) -> bytes: 

969 offset = (0x100 * 4) + (i * 24) + 4 

970 return self._contents[offset : offset + 20] 

971 

972 def _unpack_offset(self, i: int) -> int: 

973 offset = (0x100 * 4) + (i * 24) 

974 return unpack_from(">L", self._contents, offset)[0] 

975 

976 def _unpack_crc32_checksum(self, i: int) -> None: 

977 # Not stored in v1 index files 

978 return None 

979 

980 

981class PackIndex2(FilePackIndex): 

982 """Version 2 Pack Index file.""" 

983 

984 def __init__( 

985 self, 

986 filename: Union[str, os.PathLike], 

987 file: Optional[Union[IO[bytes], _GitFile]] = None, 

988 contents: Optional[bytes] = None, 

989 size: Optional[int] = None, 

990 ) -> None: 

991 """Initialize a version 2 pack index. 

992 

993 Args: 

994 filename: Path to the index file 

995 file: Optional file object 

996 contents: Optional mmap'd contents 

997 size: Optional size of the index 

998 """ 

999 super().__init__(filename, file, contents, size) 

1000 if self._contents[:4] != b"\377tOc": 

1001 raise AssertionError("Not a v2 pack index file") 

1002 (self.version,) = unpack_from(b">L", self._contents, 4) 

1003 if self.version != 2: 

1004 raise AssertionError(f"Version was {self.version}") 

1005 self._fan_out_table = self._read_fan_out_table(8) 

1006 self._name_table_offset = 8 + 0x100 * 4 

1007 self._crc32_table_offset = self._name_table_offset + 20 * len(self) 

1008 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1009 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1010 self 

1011 ) 

1012 

1013 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1014 return ( 

1015 self._unpack_name(i), 

1016 self._unpack_offset(i), 

1017 self._unpack_crc32_checksum(i), 

1018 ) 

1019 

1020 def _unpack_name(self, i: int) -> bytes: 

1021 offset = self._name_table_offset + i * 20 

1022 return self._contents[offset : offset + 20] 

1023 

1024 def _unpack_offset(self, i: int) -> int: 

1025 offset = self._pack_offset_table_offset + i * 4 

1026 offset = unpack_from(">L", self._contents, offset)[0] 

1027 if offset & (2**31): 

1028 offset = self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1029 offset = unpack_from(">Q", self._contents, offset)[0] 

1030 return offset 

1031 

1032 def _unpack_crc32_checksum(self, i: int) -> int: 

1033 return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1034 

1035 

1036class PackIndex3(FilePackIndex): 

1037 """Version 3 Pack Index file. 

1038 

1039 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1040 """ 

1041 

1042 def __init__( 

1043 self, 

1044 filename: Union[str, os.PathLike], 

1045 file: Optional[Union[IO[bytes], _GitFile]] = None, 

1046 contents: Optional[bytes] = None, 

1047 size: Optional[int] = None, 

1048 ) -> None: 

1049 """Initialize a version 3 pack index. 

1050 

1051 Args: 

1052 filename: Path to the index file 

1053 file: Optional file object 

1054 contents: Optional mmap'd contents 

1055 size: Optional size of the index 

1056 """ 

1057 super().__init__(filename, file, contents, size) 

1058 if self._contents[:4] != b"\377tOc": 

1059 raise AssertionError("Not a v3 pack index file") 

1060 (self.version,) = unpack_from(b">L", self._contents, 4) 

1061 if self.version != 3: 

1062 raise AssertionError(f"Version was {self.version}") 

1063 

1064 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1065 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8) 

1066 if self.hash_algorithm == 1: 

1067 self.hash_size = 20 # SHA-1 

1068 elif self.hash_algorithm == 2: 

1069 self.hash_size = 32 # SHA-256 

1070 else: 

1071 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}") 

1072 

1073 # Read length of shortened object names 

1074 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1075 

1076 # Calculate offsets based on variable hash size 

1077 self._fan_out_table = self._read_fan_out_table( 

1078 16 

1079 ) # After header (4 + 4 + 4 + 4) 

1080 self._name_table_offset = 16 + 0x100 * 4 

1081 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1082 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1083 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1084 self 

1085 ) 

1086 

1087 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1088 return ( 

1089 self._unpack_name(i), 

1090 self._unpack_offset(i), 

1091 self._unpack_crc32_checksum(i), 

1092 ) 

1093 

1094 def _unpack_name(self, i: int) -> bytes: 

1095 offset = self._name_table_offset + i * self.hash_size 

1096 return self._contents[offset : offset + self.hash_size] 

1097 

1098 def _unpack_offset(self, i: int) -> int: 

1099 offset = self._pack_offset_table_offset + i * 4 

1100 offset = unpack_from(">L", self._contents, offset)[0] 

1101 if offset & (2**31): 

1102 offset = self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1103 offset = unpack_from(">Q", self._contents, offset)[0] 

1104 return offset 

1105 

1106 def _unpack_crc32_checksum(self, i: int) -> int: 

1107 return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1108 

1109 

1110def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1111 """Read the header of a pack file. 

1112 

1113 Args: 

1114 read: Read function 

1115 Returns: Tuple of (pack version, number of objects). If no data is 

1116 available to read, returns (None, None). 

1117 """ 

1118 header = read(12) 

1119 if not header: 

1120 raise AssertionError("file too short to contain pack") 

1121 if header[:4] != b"PACK": 

1122 raise AssertionError(f"Invalid pack header {header!r}") 

1123 (version,) = unpack_from(b">L", header, 4) 

1124 if version not in (2, 3): 

1125 raise AssertionError(f"Version was {version}") 

1126 (num_objects,) = unpack_from(b">L", header, 8) 

1127 return (version, num_objects) 

1128 

1129 

1130def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int: 

1131 """Get the total length of a sequence of chunks. 

1132 

1133 Args: 

1134 chunks: Either a single bytes object or an iterable of bytes 

1135 Returns: Total length in bytes 

1136 """ 

1137 if isinstance(chunks, bytes): 

1138 return len(chunks) 

1139 else: 

1140 return sum(map(len, chunks)) 

1141 

1142 

1143def unpack_object( 

1144 read_all: Callable[[int], bytes], 

1145 read_some: Optional[Callable[[int], bytes]] = None, 

1146 compute_crc32: bool = False, 

1147 include_comp: bool = False, 

1148 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1149) -> tuple[UnpackedObject, bytes]: 

1150 """Unpack a Git object. 

1151 

1152 Args: 

1153 read_all: Read function that blocks until the number of requested 

1154 bytes are read. 

1155 read_some: Read function that returns at least one byte, but may not 

1156 return the number of bytes requested. 

1157 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1158 False, the returned CRC32 will be None. 

1159 include_comp: If True, include compressed data in the result. 

1160 zlib_bufsize: An optional buffer size for zlib operations. 

1161 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1162 leftover from decompression, and unpacked in an UnpackedObject with 

1163 the following attrs set: 

1164 

1165 * obj_chunks (for non-delta types) 

1166 * pack_type_num 

1167 * delta_base (for delta types) 

1168 * comp_chunks (if include_comp is True) 

1169 * decomp_chunks 

1170 * decomp_len 

1171 * crc32 (if compute_crc32 is True) 

1172 """ 

1173 if read_some is None: 

1174 read_some = read_all 

1175 if compute_crc32: 

1176 crc32 = 0 

1177 else: 

1178 crc32 = None 

1179 

1180 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1181 type_num = (raw[0] >> 4) & 0x07 

1182 size = raw[0] & 0x0F 

1183 for i, byte in enumerate(raw[1:]): 

1184 size += (byte & 0x7F) << ((i * 7) + 4) 

1185 

1186 delta_base: Union[int, bytes, None] 

1187 raw_base = len(raw) 

1188 if type_num == OFS_DELTA: 

1189 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1190 raw_base += len(raw) 

1191 if raw[-1] & 0x80: 

1192 raise AssertionError 

1193 delta_base_offset = raw[0] & 0x7F 

1194 for byte in raw[1:]: 

1195 delta_base_offset += 1 

1196 delta_base_offset <<= 7 

1197 delta_base_offset += byte & 0x7F 

1198 delta_base = delta_base_offset 

1199 elif type_num == REF_DELTA: 

1200 delta_base_obj = read_all(20) 

1201 if crc32 is not None: 

1202 crc32 = binascii.crc32(delta_base_obj, crc32) 

1203 delta_base = delta_base_obj 

1204 raw_base += 20 

1205 else: 

1206 delta_base = None 

1207 

1208 unpacked = UnpackedObject( 

1209 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32 

1210 ) 

1211 unused = read_zlib_chunks( 

1212 read_some, 

1213 unpacked, 

1214 buffer_size=zlib_bufsize, 

1215 include_comp=include_comp, 

1216 ) 

1217 return unpacked, unused 

1218 

1219 

1220def _compute_object_size(value: tuple[int, Any]) -> int: 

1221 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1222 (num, obj) = value 

1223 if num in DELTA_TYPES: 

1224 return chunks_length(obj[1]) 

1225 return chunks_length(obj) 

1226 

1227 

1228class PackStreamReader: 

1229 """Class to read a pack stream. 

1230 

1231 The pack is read from a ReceivableProtocol using read() or recv() as 

1232 appropriate. 

1233 """ 

1234 

1235 def __init__( 

1236 self, 

1237 read_all: Callable[[int], bytes], 

1238 read_some: Optional[Callable[[int], bytes]] = None, 

1239 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1240 ) -> None: 

1241 """Initialize pack stream reader. 

1242 

1243 Args: 

1244 read_all: Function to read all requested bytes 

1245 read_some: Function to read some bytes (optional) 

1246 zlib_bufsize: Buffer size for zlib decompression 

1247 """ 

1248 self.read_all = read_all 

1249 if read_some is None: 

1250 self.read_some = read_all 

1251 else: 

1252 self.read_some = read_some 

1253 self.sha = sha1() 

1254 self._offset = 0 

1255 self._rbuf = BytesIO() 

1256 # trailer is a deque to avoid memory allocation on small reads 

1257 self._trailer: deque[int] = deque() 

1258 self._zlib_bufsize = zlib_bufsize 

1259 

1260 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1261 """Read up to size bytes using the given callback. 

1262 

1263 As a side effect, update the verifier's hash (excluding the last 20 

1264 bytes read). 

1265 

1266 Args: 

1267 read: The read callback to read from. 

1268 size: The maximum number of bytes to read; the particular 

1269 behavior is callback-specific. 

1270 Returns: Bytes read 

1271 """ 

1272 data = read(size) 

1273 

1274 # maintain a trailer of the last 20 bytes we've read 

1275 n = len(data) 

1276 self._offset += n 

1277 tn = len(self._trailer) 

1278 if n >= 20: 

1279 to_pop = tn 

1280 to_add = 20 

1281 else: 

1282 to_pop = max(n + tn - 20, 0) 

1283 to_add = n 

1284 self.sha.update( 

1285 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1286 ) 

1287 self._trailer.extend(data[-to_add:]) 

1288 

1289 # hash everything but the trailer 

1290 self.sha.update(data[:-to_add]) 

1291 return data 

1292 

1293 def _buf_len(self) -> int: 

1294 buf = self._rbuf 

1295 start = buf.tell() 

1296 buf.seek(0, SEEK_END) 

1297 end = buf.tell() 

1298 buf.seek(start) 

1299 return end - start 

1300 

1301 @property 

1302 def offset(self) -> int: 

1303 """Return current offset in the stream.""" 

1304 return self._offset - self._buf_len() 

1305 

1306 def read(self, size: int) -> bytes: 

1307 """Read, blocking until size bytes are read.""" 

1308 buf_len = self._buf_len() 

1309 if buf_len >= size: 

1310 return self._rbuf.read(size) 

1311 buf_data = self._rbuf.read() 

1312 self._rbuf = BytesIO() 

1313 return buf_data + self._read(self.read_all, size - buf_len) 

1314 

1315 def recv(self, size: int) -> bytes: 

1316 """Read up to size bytes, blocking until one byte is read.""" 

1317 buf_len = self._buf_len() 

1318 if buf_len: 

1319 data = self._rbuf.read(size) 

1320 if size >= buf_len: 

1321 self._rbuf = BytesIO() 

1322 return data 

1323 return self._read(self.read_some, size) 

1324 

1325 def __len__(self) -> int: 

1326 """Return the number of objects in this pack.""" 

1327 return self._num_objects 

1328 

1329 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1330 """Read the objects in this pack file. 

1331 

1332 Args: 

1333 compute_crc32: If True, compute the CRC32 of the compressed 

1334 data. If False, the returned CRC32 will be None. 

1335 Returns: Iterator over UnpackedObjects with the following members set: 

1336 offset 

1337 obj_type_num 

1338 obj_chunks (for non-delta types) 

1339 delta_base (for delta types) 

1340 decomp_chunks 

1341 decomp_len 

1342 crc32 (if compute_crc32 is True) 

1343 

1344 Raises: 

1345 ChecksumMismatch: if the checksum of the pack contents does not 

1346 match the checksum in the pack trailer. 

1347 zlib.error: if an error occurred during zlib decompression. 

1348 IOError: if an error occurred writing to the output file. 

1349 """ 

1350 pack_version, self._num_objects = read_pack_header(self.read) 

1351 

1352 for _ in range(self._num_objects): 

1353 offset = self.offset 

1354 unpacked, unused = unpack_object( 

1355 self.read, 

1356 read_some=self.recv, 

1357 compute_crc32=compute_crc32, 

1358 zlib_bufsize=self._zlib_bufsize, 

1359 ) 

1360 unpacked.offset = offset 

1361 

1362 # prepend any unused data to current read buffer 

1363 buf = BytesIO() 

1364 buf.write(unused) 

1365 buf.write(self._rbuf.read()) 

1366 buf.seek(0) 

1367 self._rbuf = buf 

1368 

1369 yield unpacked 

1370 

1371 if self._buf_len() < 20: 

1372 # If the read buffer is full, then the last read() got the whole 

1373 # trailer off the wire. If not, it means there is still some of the 

1374 # trailer to read. We need to read() all 20 bytes; N come from the 

1375 # read buffer and (20 - N) come from the wire. 

1376 self.read(20) 

1377 

1378 pack_sha = bytearray(self._trailer) # type: ignore 

1379 if pack_sha != self.sha.digest(): 

1380 raise ChecksumMismatch(sha_to_hex(pack_sha), self.sha.hexdigest()) 

1381 

1382 

1383class PackStreamCopier(PackStreamReader): 

1384 """Class to verify a pack stream as it is being read. 

1385 

1386 The pack is read from a ReceivableProtocol using read() or recv() as 

1387 appropriate and written out to the given file-like object. 

1388 """ 

1389 

1390 def __init__( 

1391 self, 

1392 read_all: Callable, 

1393 read_some: Callable, 

1394 outfile: IO[bytes], 

1395 delta_iter: Optional["DeltaChainIterator"] = None, 

1396 ) -> None: 

1397 """Initialize the copier. 

1398 

1399 Args: 

1400 read_all: Read function that blocks until the number of 

1401 requested bytes are read. 

1402 read_some: Read function that returns at least one byte, but may 

1403 not return the number of bytes requested. 

1404 outfile: File-like object to write output through. 

1405 delta_iter: Optional DeltaChainIterator to record deltas as we 

1406 read them. 

1407 """ 

1408 super().__init__(read_all, read_some=read_some) 

1409 self.outfile = outfile 

1410 self._delta_iter = delta_iter 

1411 

1412 def _read(self, read: Callable, size: int) -> bytes: 

1413 """Read data from the read callback and write it to the file.""" 

1414 data = super()._read(read, size) 

1415 self.outfile.write(data) 

1416 return data 

1417 

1418 def verify(self, progress: Optional[Callable] = None) -> None: 

1419 """Verify a pack stream and write it to the output file. 

1420 

1421 See PackStreamReader.iterobjects for a list of exceptions this may 

1422 throw. 

1423 """ 

1424 i = 0 # default count of entries if read_objects() is empty 

1425 for i, unpacked in enumerate(self.read_objects()): 

1426 if self._delta_iter: 

1427 self._delta_iter.record(unpacked) 

1428 if progress is not None: 

1429 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1430 if progress is not None: 

1431 progress(f"copied {i} pack entries\n".encode("ascii")) 

1432 

1433 

1434def obj_sha(type: int, chunks: Union[bytes, Iterable[bytes]]) -> bytes: 

1435 """Compute the SHA for a numeric type and object chunks.""" 

1436 sha = sha1() 

1437 sha.update(object_header(type, chunks_length(chunks))) 

1438 if isinstance(chunks, bytes): 

1439 sha.update(chunks) 

1440 else: 

1441 for chunk in chunks: 

1442 sha.update(chunk) 

1443 return sha.digest() 

1444 

1445 

1446def compute_file_sha( 

1447 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16 

1448) -> "HashObject": 

1449 """Hash a portion of a file into a new SHA. 

1450 

1451 Args: 

1452 f: A file-like object to read from that supports seek(). 

1453 start_ofs: The offset in the file to start reading at. 

1454 end_ofs: The offset in the file to end reading at, relative to the 

1455 end of the file. 

1456 buffer_size: A buffer size for reading. 

1457 Returns: A new SHA object updated with data read from the file. 

1458 """ 

1459 sha = sha1() 

1460 f.seek(0, SEEK_END) 

1461 length = f.tell() 

1462 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1463 raise AssertionError( 

1464 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1465 ) 

1466 todo = length + end_ofs - start_ofs 

1467 f.seek(start_ofs) 

1468 while todo: 

1469 data = f.read(min(todo, buffer_size)) 

1470 sha.update(data) 

1471 todo -= len(data) 

1472 return sha 

1473 

1474 

1475class PackData: 

1476 """The data contained in a packfile. 

1477 

1478 Pack files can be accessed both sequentially for exploding a pack, and 

1479 directly with the help of an index to retrieve a specific object. 

1480 

1481 The objects within are either complete or a delta against another. 

1482 

1483 The header is variable length. If the MSB of each byte is set then it 

1484 indicates that the subsequent byte is still part of the header. 

1485 For the first byte the next MS bits are the type, which tells you the type 

1486 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1487 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1488 size, i.e. the last byte of the header contains the MS bits of the size. 

1489 

1490 For the complete objects the data is stored as zlib deflated data. 

1491 The size in the header is the uncompressed object size, so to uncompress 

1492 you need to just keep feeding data to zlib until you get an object back, 

1493 or it errors on bad data. This is done here by just giving the complete 

1494 buffer from the start of the deflated object on. This is bad, but until I 

1495 get mmap sorted out it will have to do. 

1496 

1497 Currently there are no integrity checks done. Also no attempt is made to 

1498 try and detect the delta case, or a request for an object at the wrong 

1499 position. It will all just throw a zlib or KeyError. 

1500 """ 

1501 

1502 def __init__( 

1503 self, 

1504 filename: Union[str, os.PathLike], 

1505 file: Optional[IO[bytes]] = None, 

1506 size: Optional[int] = None, 

1507 *, 

1508 delta_window_size: Optional[int] = None, 

1509 window_memory: Optional[int] = None, 

1510 delta_cache_size: Optional[int] = None, 

1511 depth: Optional[int] = None, 

1512 threads: Optional[int] = None, 

1513 big_file_threshold: Optional[int] = None, 

1514 ) -> None: 

1515 """Create a PackData object representing the pack in the given filename. 

1516 

1517 The file must exist and stay readable until the object is disposed of. 

1518 It must also stay the same size. It will be mapped whenever needed. 

1519 

1520 Currently there is a restriction on the size of the pack as the python 

1521 mmap implementation is flawed. 

1522 """ 

1523 self._filename = filename 

1524 self._size = size 

1525 self._header_size = 12 

1526 self.delta_window_size = delta_window_size 

1527 self.window_memory = window_memory 

1528 self.delta_cache_size = delta_cache_size 

1529 self.depth = depth 

1530 self.threads = threads 

1531 self.big_file_threshold = big_file_threshold 

1532 self._file: IO[bytes] 

1533 

1534 if file is None: 

1535 self._file = GitFile(self._filename, "rb") 

1536 else: 

1537 self._file = file 

1538 (version, self._num_objects) = read_pack_header(self._file.read) 

1539 

1540 # Use delta_cache_size config if available, otherwise default 

1541 cache_size = delta_cache_size or (1024 * 1024 * 20) 

1542 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1543 cache_size, compute_size=_compute_object_size 

1544 ) 

1545 

1546 @property 

1547 def filename(self) -> str: 

1548 """Get the filename of the pack file. 

1549 

1550 Returns: 

1551 Base filename without directory path 

1552 """ 

1553 return os.path.basename(self._filename) 

1554 

1555 @property 

1556 def path(self) -> Union[str, os.PathLike]: 

1557 """Get the full path of the pack file. 

1558 

1559 Returns: 

1560 Full path to the pack file 

1561 """ 

1562 return self._filename 

1563 

1564 @classmethod 

1565 def from_file(cls, file: IO[bytes], size: Optional[int] = None) -> "PackData": 

1566 """Create a PackData object from an open file. 

1567 

1568 Args: 

1569 file: Open file object 

1570 size: Optional file size 

1571 

1572 Returns: 

1573 PackData instance 

1574 """ 

1575 return cls(str(file), file=file, size=size) 

1576 

1577 @classmethod 

1578 def from_path(cls, path: Union[str, os.PathLike]) -> "PackData": 

1579 """Create a PackData object from a file path. 

1580 

1581 Args: 

1582 path: Path to the pack file 

1583 

1584 Returns: 

1585 PackData instance 

1586 """ 

1587 return cls(filename=path) 

1588 

1589 def close(self) -> None: 

1590 """Close the underlying pack file.""" 

1591 self._file.close() 

1592 

1593 def __enter__(self) -> "PackData": 

1594 """Enter context manager.""" 

1595 return self 

1596 

1597 def __exit__( 

1598 self, 

1599 exc_type: Optional[type], 

1600 exc_val: Optional[BaseException], 

1601 exc_tb: Optional[TracebackType], 

1602 ) -> None: 

1603 """Exit context manager.""" 

1604 self.close() 

1605 

1606 def __eq__(self, other: object) -> bool: 

1607 """Check equality with another object.""" 

1608 if isinstance(other, PackData): 

1609 return self.get_stored_checksum() == other.get_stored_checksum() 

1610 return False 

1611 

1612 def _get_size(self) -> int: 

1613 if self._size is not None: 

1614 return self._size 

1615 self._size = os.path.getsize(self._filename) 

1616 if self._size < self._header_size: 

1617 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1618 raise AssertionError(errmsg) 

1619 return self._size 

1620 

1621 def __len__(self) -> int: 

1622 """Returns the number of objects in this pack.""" 

1623 return self._num_objects 

1624 

1625 def calculate_checksum(self) -> bytes: 

1626 """Calculate the checksum for this pack. 

1627 

1628 Returns: 20-byte binary SHA1 digest 

1629 """ 

1630 return compute_file_sha(cast(IO[bytes], self._file), end_ofs=-20).digest() 

1631 

1632 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1633 """Iterate over unpacked objects in the pack.""" 

1634 self._file.seek(self._header_size) 

1635 

1636 if self._num_objects is None: 

1637 return 

1638 

1639 for _ in range(self._num_objects): 

1640 offset = self._file.tell() 

1641 unpacked, unused = unpack_object( 

1642 self._file.read, compute_crc32=False, include_comp=include_comp 

1643 ) 

1644 unpacked.offset = offset 

1645 yield unpacked 

1646 # Back up over unused data. 

1647 self._file.seek(-len(unused), SEEK_CUR) 

1648 

1649 def iterentries( 

1650 self, progress=None, resolve_ext_ref: Optional[ResolveExtRefFn] = None 

1651 ): 

1652 """Yield entries summarizing the contents of this pack. 

1653 

1654 Args: 

1655 progress: Progress function, called with current and total 

1656 object count. 

1657 resolve_ext_ref: Optional function to resolve external references 

1658 Returns: iterator of tuples with (sha, offset, crc32) 

1659 """ 

1660 num_objects = self._num_objects 

1661 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1662 for i, result in enumerate(indexer): 

1663 if progress is not None: 

1664 progress(i, num_objects) 

1665 yield result 

1666 

1667 def sorted_entries( 

1668 self, 

1669 progress: Optional[ProgressFn] = None, 

1670 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1671 ) -> list[tuple[bytes, int, int]]: 

1672 """Return entries in this pack, sorted by SHA. 

1673 

1674 Args: 

1675 progress: Progress function, called with current and total 

1676 object count 

1677 resolve_ext_ref: Optional function to resolve external references 

1678 Returns: Iterator of tuples with (sha, offset, crc32) 

1679 """ 

1680 return sorted( 

1681 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) 

1682 ) 

1683 

1684 def create_index_v1( 

1685 self, 

1686 filename: str, 

1687 progress: Optional[Callable] = None, 

1688 resolve_ext_ref: Optional[Callable] = None, 

1689 ) -> bytes: 

1690 """Create a version 1 file for this data file. 

1691 

1692 Args: 

1693 filename: Index filename. 

1694 progress: Progress report function 

1695 resolve_ext_ref: Optional function to resolve external references 

1696 Returns: Checksum of index file 

1697 """ 

1698 entries = self.sorted_entries( 

1699 progress=progress, resolve_ext_ref=resolve_ext_ref 

1700 ) 

1701 checksum = self.calculate_checksum() 

1702 with GitFile(filename, "wb") as f: 

1703 write_pack_index_v1( 

1704 cast(BinaryIO, f), 

1705 cast(list[tuple[bytes, int, Optional[int]]], entries), 

1706 checksum, 

1707 ) 

1708 return checksum 

1709 

1710 def create_index_v2( 

1711 self, 

1712 filename: str, 

1713 progress: Optional[Callable] = None, 

1714 resolve_ext_ref: Optional[Callable] = None, 

1715 ) -> bytes: 

1716 """Create a version 2 index file for this data file. 

1717 

1718 Args: 

1719 filename: Index filename. 

1720 progress: Progress report function 

1721 resolve_ext_ref: Optional function to resolve external references 

1722 Returns: Checksum of index file 

1723 """ 

1724 entries = self.sorted_entries( 

1725 progress=progress, resolve_ext_ref=resolve_ext_ref 

1726 ) 

1727 with GitFile(filename, "wb") as f: 

1728 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

1729 

1730 def create_index_v3( 

1731 self, 

1732 filename: str, 

1733 progress: Optional[Callable] = None, 

1734 resolve_ext_ref: Optional[Callable] = None, 

1735 hash_algorithm: int = 1, 

1736 ) -> bytes: 

1737 """Create a version 3 index file for this data file. 

1738 

1739 Args: 

1740 filename: Index filename. 

1741 progress: Progress report function 

1742 resolve_ext_ref: Function to resolve external references 

1743 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1744 Returns: Checksum of index file 

1745 """ 

1746 entries = self.sorted_entries( 

1747 progress=progress, resolve_ext_ref=resolve_ext_ref 

1748 ) 

1749 with GitFile(filename, "wb") as f: 

1750 return write_pack_index_v3( 

1751 f, entries, self.calculate_checksum(), hash_algorithm 

1752 ) 

1753 

1754 def create_index( 

1755 self, 

1756 filename: str, 

1757 progress: Optional[Callable] = None, 

1758 version: int = 2, 

1759 resolve_ext_ref: Optional[Callable] = None, 

1760 hash_algorithm: int = 1, 

1761 ) -> bytes: 

1762 """Create an index file for this data file. 

1763 

1764 Args: 

1765 filename: Index filename. 

1766 progress: Progress report function 

1767 version: Index version (1, 2, or 3) 

1768 resolve_ext_ref: Function to resolve external references 

1769 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

1770 Returns: Checksum of index file 

1771 """ 

1772 if version == 1: 

1773 return self.create_index_v1( 

1774 filename, progress, resolve_ext_ref=resolve_ext_ref 

1775 ) 

1776 elif version == 2: 

1777 return self.create_index_v2( 

1778 filename, progress, resolve_ext_ref=resolve_ext_ref 

1779 ) 

1780 elif version == 3: 

1781 return self.create_index_v3( 

1782 filename, 

1783 progress, 

1784 resolve_ext_ref=resolve_ext_ref, 

1785 hash_algorithm=hash_algorithm, 

1786 ) 

1787 else: 

1788 raise ValueError(f"unknown index format {version}") 

1789 

1790 def get_stored_checksum(self) -> bytes: 

1791 """Return the expected checksum stored in this pack.""" 

1792 self._file.seek(-20, SEEK_END) 

1793 return self._file.read(20) 

1794 

1795 def check(self) -> None: 

1796 """Check the consistency of this pack.""" 

1797 actual = self.calculate_checksum() 

1798 stored = self.get_stored_checksum() 

1799 if actual != stored: 

1800 raise ChecksumMismatch(stored, actual) 

1801 

1802 def get_unpacked_object_at( 

1803 self, offset: int, *, include_comp: bool = False 

1804 ) -> UnpackedObject: 

1805 """Given offset in the packfile return a UnpackedObject.""" 

1806 assert offset >= self._header_size 

1807 self._file.seek(offset) 

1808 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp) 

1809 unpacked.offset = offset 

1810 return unpacked 

1811 

1812 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

1813 """Given an offset in to the packfile return the object that is there. 

1814 

1815 Using the associated index the location of an object can be looked up, 

1816 and then the packfile can be asked directly for that object using this 

1817 function. 

1818 """ 

1819 try: 

1820 return self._offset_cache[offset] 

1821 except KeyError: 

1822 pass 

1823 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

1824 return (unpacked.pack_type_num, unpacked._obj()) 

1825 

1826 

1827T = TypeVar("T") 

1828 

1829 

1830class DeltaChainIterator(Generic[T]): 

1831 """Abstract iterator over pack data based on delta chains. 

1832 

1833 Each object in the pack is guaranteed to be inflated exactly once, 

1834 regardless of how many objects reference it as a delta base. As a result, 

1835 memory usage is proportional to the length of the longest delta chain. 

1836 

1837 Subclasses can override _result to define the result type of the iterator. 

1838 By default, results are UnpackedObjects with the following members set: 

1839 

1840 * offset 

1841 * obj_type_num 

1842 * obj_chunks 

1843 * pack_type_num 

1844 * delta_base (for delta types) 

1845 * comp_chunks (if _include_comp is True) 

1846 * decomp_chunks 

1847 * decomp_len 

1848 * crc32 (if _compute_crc32 is True) 

1849 """ 

1850 

1851 _compute_crc32 = False 

1852 _include_comp = False 

1853 

1854 def __init__( 

1855 self, 

1856 file_obj: Optional[BinaryIO], 

1857 *, 

1858 resolve_ext_ref: Optional[Callable] = None, 

1859 ) -> None: 

1860 """Initialize DeltaChainIterator. 

1861 

1862 Args: 

1863 file_obj: File object to read pack data from 

1864 resolve_ext_ref: Optional function to resolve external references 

1865 """ 

1866 self._file = file_obj 

1867 self._resolve_ext_ref = resolve_ext_ref 

1868 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

1869 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

1870 self._full_ofs: list[tuple[int, int]] = [] 

1871 self._ext_refs: list[bytes] = [] 

1872 

1873 @classmethod 

1874 def for_pack_data( 

1875 cls, pack_data: PackData, resolve_ext_ref: Optional[Callable] = None 

1876 ) -> "DeltaChainIterator": 

1877 """Create a DeltaChainIterator from pack data. 

1878 

1879 Args: 

1880 pack_data: PackData object to iterate 

1881 resolve_ext_ref: Optional function to resolve external refs 

1882 

1883 Returns: 

1884 DeltaChainIterator instance 

1885 """ 

1886 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1887 walker.set_pack_data(pack_data) 

1888 for unpacked in pack_data.iter_unpacked(include_comp=False): 

1889 walker.record(unpacked) 

1890 return walker 

1891 

1892 @classmethod 

1893 def for_pack_subset( 

1894 cls, 

1895 pack: "Pack", 

1896 shas: Iterable[bytes], 

1897 *, 

1898 allow_missing: bool = False, 

1899 resolve_ext_ref: Optional[Callable] = None, 

1900 ) -> "DeltaChainIterator": 

1901 """Create a DeltaChainIterator for a subset of objects. 

1902 

1903 Args: 

1904 pack: Pack object containing the data 

1905 shas: Iterable of object SHAs to include 

1906 allow_missing: If True, skip missing objects 

1907 resolve_ext_ref: Optional function to resolve external refs 

1908 

1909 Returns: 

1910 DeltaChainIterator instance 

1911 """ 

1912 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1913 walker.set_pack_data(pack.data) 

1914 todo = set() 

1915 for sha in shas: 

1916 assert isinstance(sha, bytes) 

1917 try: 

1918 off = pack.index.object_offset(sha) 

1919 except KeyError: 

1920 if not allow_missing: 

1921 raise 

1922 else: 

1923 todo.add(off) 

1924 done = set() 

1925 while todo: 

1926 off = todo.pop() 

1927 unpacked = pack.data.get_unpacked_object_at(off) 

1928 walker.record(unpacked) 

1929 done.add(off) 

1930 base_ofs = None 

1931 if unpacked.pack_type_num == OFS_DELTA: 

1932 assert unpacked.offset is not None 

1933 assert unpacked.delta_base is not None 

1934 assert isinstance(unpacked.delta_base, int) 

1935 base_ofs = unpacked.offset - unpacked.delta_base 

1936 elif unpacked.pack_type_num == REF_DELTA: 

1937 with suppress(KeyError): 

1938 assert isinstance(unpacked.delta_base, bytes) 

1939 base_ofs = pack.index.object_index(unpacked.delta_base) 

1940 if base_ofs is not None and base_ofs not in done: 

1941 todo.add(base_ofs) 

1942 return walker 

1943 

1944 def record(self, unpacked: UnpackedObject) -> None: 

1945 """Record an unpacked object for later processing. 

1946 

1947 Args: 

1948 unpacked: UnpackedObject to record 

1949 """ 

1950 type_num = unpacked.pack_type_num 

1951 offset = unpacked.offset 

1952 assert offset is not None 

1953 if type_num == OFS_DELTA: 

1954 assert unpacked.delta_base is not None 

1955 assert isinstance(unpacked.delta_base, int) 

1956 base_offset = offset - unpacked.delta_base 

1957 self._pending_ofs[base_offset].append(offset) 

1958 elif type_num == REF_DELTA: 

1959 assert isinstance(unpacked.delta_base, bytes) 

1960 self._pending_ref[unpacked.delta_base].append(offset) 

1961 else: 

1962 self._full_ofs.append((offset, type_num)) 

1963 

1964 def set_pack_data(self, pack_data: PackData) -> None: 

1965 """Set the pack data for iteration. 

1966 

1967 Args: 

1968 pack_data: PackData object to use 

1969 """ 

1970 self._file = cast(BinaryIO, pack_data._file) 

1971 

1972 def _walk_all_chains(self) -> Iterator[T]: 

1973 for offset, type_num in self._full_ofs: 

1974 yield from self._follow_chain(offset, type_num, None) 

1975 yield from self._walk_ref_chains() 

1976 assert not self._pending_ofs, repr(self._pending_ofs) 

1977 

1978 def _ensure_no_pending(self) -> None: 

1979 if self._pending_ref: 

1980 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref]) 

1981 

1982 def _walk_ref_chains(self) -> Iterator[T]: 

1983 if not self._resolve_ext_ref: 

1984 self._ensure_no_pending() 

1985 return 

1986 

1987 for base_sha, pending in sorted(self._pending_ref.items()): 

1988 if base_sha not in self._pending_ref: 

1989 continue 

1990 try: 

1991 type_num, chunks = self._resolve_ext_ref(base_sha) 

1992 except KeyError: 

1993 # Not an external ref, but may depend on one. Either it will 

1994 # get popped via a _follow_chain call, or we will raise an 

1995 # error below. 

1996 continue 

1997 self._ext_refs.append(base_sha) 

1998 self._pending_ref.pop(base_sha) 

1999 for new_offset in pending: 

2000 yield from self._follow_chain(new_offset, type_num, chunks) 

2001 

2002 self._ensure_no_pending() 

2003 

2004 def _result(self, unpacked: UnpackedObject) -> T: 

2005 raise NotImplementedError 

2006 

2007 def _resolve_object( 

2008 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2009 ) -> UnpackedObject: 

2010 assert self._file is not None 

2011 self._file.seek(offset) 

2012 unpacked, _ = unpack_object( 

2013 self._file.read, 

2014 include_comp=self._include_comp, 

2015 compute_crc32=self._compute_crc32, 

2016 ) 

2017 unpacked.offset = offset 

2018 if base_chunks is None: 

2019 assert unpacked.pack_type_num == obj_type_num 

2020 else: 

2021 assert unpacked.pack_type_num in DELTA_TYPES 

2022 unpacked.obj_type_num = obj_type_num 

2023 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2024 return unpacked 

2025 

2026 def _follow_chain( 

2027 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2028 ) -> Iterator[T]: 

2029 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2030 # this approach by design inflates each object exactly once. 

2031 todo = [(offset, obj_type_num, base_chunks)] 

2032 while todo: 

2033 (offset, obj_type_num, base_chunks) = todo.pop() 

2034 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2035 yield self._result(unpacked) 

2036 

2037 assert unpacked.offset is not None 

2038 unblocked = chain( 

2039 self._pending_ofs.pop(unpacked.offset, []), 

2040 self._pending_ref.pop(unpacked.sha(), []), 

2041 ) 

2042 todo.extend( 

2043 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2044 for new_offset in unblocked 

2045 ) 

2046 

2047 def __iter__(self) -> Iterator[T]: 

2048 """Iterate over objects in the pack.""" 

2049 return self._walk_all_chains() 

2050 

2051 def ext_refs(self) -> list[bytes]: 

2052 """Return external references.""" 

2053 return self._ext_refs 

2054 

2055 

2056class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2057 """Delta chain iterator that yield unpacked objects.""" 

2058 

2059 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2060 """Return the unpacked object. 

2061 

2062 Args: 

2063 unpacked: The unpacked object 

2064 

2065 Returns: 

2066 The unpacked object unchanged 

2067 """ 

2068 return unpacked 

2069 

2070 

2071class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2072 """Delta chain iterator that yields index entries.""" 

2073 

2074 _compute_crc32 = True 

2075 

2076 def _result(self, unpacked: UnpackedObject) -> tuple: 

2077 """Convert unpacked object to pack index entry. 

2078 

2079 Args: 

2080 unpacked: The unpacked object 

2081 

2082 Returns: 

2083 Tuple of (sha, offset, crc32) for index entry 

2084 """ 

2085 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2086 

2087 

2088class PackInflater(DeltaChainIterator[ShaFile]): 

2089 """Delta chain iterator that yields ShaFile objects.""" 

2090 

2091 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2092 """Convert unpacked object to ShaFile. 

2093 

2094 Args: 

2095 unpacked: The unpacked object 

2096 

2097 Returns: 

2098 ShaFile object from the unpacked data 

2099 """ 

2100 return unpacked.sha_file() 

2101 

2102 

2103class SHA1Reader(BinaryIO): 

2104 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2105 

2106 def __init__(self, f: IO[bytes]) -> None: 

2107 """Initialize SHA1Reader. 

2108 

2109 Args: 

2110 f: File-like object to wrap 

2111 """ 

2112 self.f = f 

2113 self.sha1 = sha1(b"") 

2114 

2115 def read(self, size: int = -1) -> bytes: 

2116 """Read bytes and update SHA1. 

2117 

2118 Args: 

2119 size: Number of bytes to read, -1 for all 

2120 

2121 Returns: 

2122 Bytes read from file 

2123 """ 

2124 data = self.f.read(size) 

2125 self.sha1.update(data) 

2126 return data 

2127 

2128 def check_sha(self, allow_empty: bool = False) -> None: 

2129 """Check if the SHA1 matches the expected value. 

2130 

2131 Args: 

2132 allow_empty: Allow empty SHA1 hash 

2133 

2134 Raises: 

2135 ChecksumMismatch: If SHA1 doesn't match 

2136 """ 

2137 stored = self.f.read(20) 

2138 # If git option index.skipHash is set the index will be empty 

2139 if stored != self.sha1.digest() and ( 

2140 not allow_empty 

2141 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000" 

2142 ): 

2143 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored)) 

2144 

2145 def close(self) -> None: 

2146 """Close the underlying file.""" 

2147 return self.f.close() 

2148 

2149 def tell(self) -> int: 

2150 """Return current file position.""" 

2151 return self.f.tell() 

2152 

2153 # BinaryIO abstract methods 

2154 def readable(self) -> bool: 

2155 """Check if file is readable.""" 

2156 return True 

2157 

2158 def writable(self) -> bool: 

2159 """Check if file is writable.""" 

2160 return False 

2161 

2162 def seekable(self) -> bool: 

2163 """Check if file is seekable.""" 

2164 return getattr(self.f, "seekable", lambda: False)() 

2165 

2166 def seek(self, offset: int, whence: int = 0) -> int: 

2167 """Seek to position in file. 

2168 

2169 Args: 

2170 offset: Position offset 

2171 whence: Reference point (0=start, 1=current, 2=end) 

2172 

2173 Returns: 

2174 New file position 

2175 """ 

2176 return self.f.seek(offset, whence) 

2177 

2178 def flush(self) -> None: 

2179 """Flush the file buffer.""" 

2180 if hasattr(self.f, "flush"): 

2181 self.f.flush() 

2182 

2183 def readline(self, size: int = -1) -> bytes: 

2184 """Read a line from the file. 

2185 

2186 Args: 

2187 size: Maximum bytes to read 

2188 

2189 Returns: 

2190 Line read from file 

2191 """ 

2192 return self.f.readline(size) 

2193 

2194 def readlines(self, hint: int = -1) -> list[bytes]: 

2195 """Read all lines from the file. 

2196 

2197 Args: 

2198 hint: Approximate number of bytes to read 

2199 

2200 Returns: 

2201 List of lines 

2202 """ 

2203 return self.f.readlines(hint) 

2204 

2205 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2206 """Write multiple lines to the file (not supported).""" 

2207 raise UnsupportedOperation("writelines") 

2208 

2209 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2210 """Write data to the file (not supported).""" 

2211 raise UnsupportedOperation("write") 

2212 

2213 def __enter__(self) -> "SHA1Reader": 

2214 """Enter context manager.""" 

2215 return self 

2216 

2217 def __exit__( 

2218 self, 

2219 type: Optional[type], 

2220 value: Optional[BaseException], 

2221 traceback: Optional[TracebackType], 

2222 ) -> None: 

2223 """Exit context manager and close file.""" 

2224 self.close() 

2225 

2226 def __iter__(self) -> "SHA1Reader": 

2227 """Return iterator for reading file lines.""" 

2228 return self 

2229 

2230 def __next__(self) -> bytes: 

2231 """Get next line from file. 

2232 

2233 Returns: 

2234 Next line 

2235 

2236 Raises: 

2237 StopIteration: When no more lines 

2238 """ 

2239 line = self.readline() 

2240 if not line: 

2241 raise StopIteration 

2242 return line 

2243 

2244 def fileno(self) -> int: 

2245 """Return file descriptor number.""" 

2246 return self.f.fileno() 

2247 

2248 def isatty(self) -> bool: 

2249 """Check if file is a terminal.""" 

2250 return getattr(self.f, "isatty", lambda: False)() 

2251 

2252 def truncate(self, size: Optional[int] = None) -> int: 

2253 """Not supported for read-only file. 

2254 

2255 Raises: 

2256 UnsupportedOperation: Always raised 

2257 """ 

2258 raise UnsupportedOperation("truncate") 

2259 

2260 

2261class SHA1Writer(BinaryIO): 

2262 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2263 

2264 def __init__(self, f) -> None: 

2265 """Initialize SHA1Writer. 

2266 

2267 Args: 

2268 f: File-like object to wrap 

2269 """ 

2270 self.f = f 

2271 self.length = 0 

2272 self.sha1 = sha1(b"") 

2273 self.digest: Optional[bytes] = None 

2274 

2275 def write(self, data) -> int: 

2276 """Write data and update SHA1. 

2277 

2278 Args: 

2279 data: Data to write 

2280 

2281 Returns: 

2282 Number of bytes written 

2283 """ 

2284 self.sha1.update(data) 

2285 self.f.write(data) 

2286 self.length += len(data) 

2287 return len(data) 

2288 

2289 def write_sha(self) -> bytes: 

2290 """Write the SHA1 digest to the file. 

2291 

2292 Returns: 

2293 The SHA1 digest bytes 

2294 """ 

2295 sha = self.sha1.digest() 

2296 assert len(sha) == 20 

2297 self.f.write(sha) 

2298 self.length += len(sha) 

2299 return sha 

2300 

2301 def close(self) -> None: 

2302 """Close the pack file and finalize the SHA.""" 

2303 self.digest = self.write_sha() 

2304 self.f.close() 

2305 

2306 def offset(self) -> int: 

2307 """Get the total number of bytes written. 

2308 

2309 Returns: 

2310 Total bytes written 

2311 """ 

2312 return self.length 

2313 

2314 def tell(self) -> int: 

2315 """Return current file position.""" 

2316 return self.f.tell() 

2317 

2318 # BinaryIO abstract methods 

2319 def readable(self) -> bool: 

2320 """Check if file is readable.""" 

2321 return False 

2322 

2323 def writable(self) -> bool: 

2324 """Check if file is writable.""" 

2325 return True 

2326 

2327 def seekable(self) -> bool: 

2328 """Check if file is seekable.""" 

2329 return getattr(self.f, "seekable", lambda: False)() 

2330 

2331 def seek(self, offset: int, whence: int = 0) -> int: 

2332 """Seek to position in file. 

2333 

2334 Args: 

2335 offset: Position offset 

2336 whence: Reference point (0=start, 1=current, 2=end) 

2337 

2338 Returns: 

2339 New file position 

2340 """ 

2341 return self.f.seek(offset, whence) 

2342 

2343 def flush(self) -> None: 

2344 """Flush the file buffer.""" 

2345 if hasattr(self.f, "flush"): 

2346 self.f.flush() 

2347 

2348 def readline(self, size: int = -1) -> bytes: 

2349 """Not supported for write-only file. 

2350 

2351 Raises: 

2352 UnsupportedOperation: Always raised 

2353 """ 

2354 raise UnsupportedOperation("readline") 

2355 

2356 def readlines(self, hint: int = -1) -> list[bytes]: 

2357 """Not supported for write-only file. 

2358 

2359 Raises: 

2360 UnsupportedOperation: Always raised 

2361 """ 

2362 raise UnsupportedOperation("readlines") 

2363 

2364 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2365 """Write multiple lines to the file. 

2366 

2367 Args: 

2368 lines: Iterable of lines to write 

2369 """ 

2370 for line in lines: 

2371 self.write(line) 

2372 

2373 def read(self, size: int = -1) -> bytes: 

2374 """Not supported for write-only file. 

2375 

2376 Raises: 

2377 UnsupportedOperation: Always raised 

2378 """ 

2379 raise UnsupportedOperation("read") 

2380 

2381 def __enter__(self) -> "SHA1Writer": 

2382 """Enter context manager.""" 

2383 return self 

2384 

2385 def __exit__( 

2386 self, 

2387 type: Optional[type], 

2388 value: Optional[BaseException], 

2389 traceback: Optional[TracebackType], 

2390 ) -> None: 

2391 """Exit context manager and close file.""" 

2392 self.close() 

2393 

2394 def __iter__(self) -> "SHA1Writer": 

2395 """Return iterator.""" 

2396 return self 

2397 

2398 def __next__(self) -> bytes: 

2399 """Not supported for write-only file. 

2400 

2401 Raises: 

2402 UnsupportedOperation: Always raised 

2403 """ 

2404 raise UnsupportedOperation("__next__") 

2405 

2406 def fileno(self) -> int: 

2407 """Return file descriptor number.""" 

2408 return self.f.fileno() 

2409 

2410 def isatty(self) -> bool: 

2411 """Check if file is a terminal.""" 

2412 return getattr(self.f, "isatty", lambda: False)() 

2413 

2414 def truncate(self, size: Optional[int] = None) -> int: 

2415 """Not supported for write-only file. 

2416 

2417 Raises: 

2418 UnsupportedOperation: Always raised 

2419 """ 

2420 raise UnsupportedOperation("truncate") 

2421 

2422 

2423def pack_object_header( 

2424 type_num: int, delta_base: Optional[Union[bytes, int]], size: int 

2425) -> bytearray: 

2426 """Create a pack object header for the given object info. 

2427 

2428 Args: 

2429 type_num: Numeric type of the object. 

2430 delta_base: Delta base offset or ref, or None for whole objects. 

2431 size: Uncompressed object size. 

2432 Returns: A header for a packed object. 

2433 """ 

2434 header = [] 

2435 c = (type_num << 4) | (size & 15) 

2436 size >>= 4 

2437 while size: 

2438 header.append(c | 0x80) 

2439 c = size & 0x7F 

2440 size >>= 7 

2441 header.append(c) 

2442 if type_num == OFS_DELTA: 

2443 assert isinstance(delta_base, int) 

2444 ret = [delta_base & 0x7F] 

2445 delta_base >>= 7 

2446 while delta_base: 

2447 delta_base -= 1 

2448 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2449 delta_base >>= 7 

2450 header.extend(ret) 

2451 elif type_num == REF_DELTA: 

2452 assert isinstance(delta_base, bytes) 

2453 assert len(delta_base) == 20 

2454 header += delta_base 

2455 return bytearray(header) 

2456 

2457 

2458def pack_object_chunks( 

2459 type: int, 

2460 object: Union[ 

2461 ShaFile, bytes, list[bytes], tuple[Union[bytes, int], Union[bytes, list[bytes]]] 

2462 ], 

2463 compression_level: int = -1, 

2464) -> Iterator[bytes]: 

2465 """Generate chunks for a pack object. 

2466 

2467 Args: 

2468 type: Numeric type of the object 

2469 object: Object to write 

2470 compression_level: the zlib compression level 

2471 Returns: Chunks 

2472 """ 

2473 if type in DELTA_TYPES: 

2474 if isinstance(object, tuple): 

2475 delta_base, object = object 

2476 else: 

2477 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2478 else: 

2479 delta_base = None 

2480 

2481 # Convert object to list of bytes chunks 

2482 if isinstance(object, bytes): 

2483 chunks = [object] 

2484 elif isinstance(object, list): 

2485 chunks = object 

2486 elif isinstance(object, ShaFile): 

2487 chunks = object.as_raw_chunks() 

2488 else: 

2489 # Shouldn't reach here with proper typing 

2490 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

2491 

2492 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks)))) 

2493 compressor = zlib.compressobj(level=compression_level) 

2494 for data in chunks: 

2495 yield compressor.compress(data) 

2496 yield compressor.flush() 

2497 

2498 

2499def write_pack_object( 

2500 write: Callable[[bytes], int], 

2501 type: int, 

2502 object: ShaFile, 

2503 sha: Optional["HashObject"] = None, 

2504 compression_level: int = -1, 

2505) -> int: 

2506 """Write pack object to a file. 

2507 

2508 Args: 

2509 write: Write function to use 

2510 type: Numeric type of the object 

2511 object: Object to write 

2512 sha: Optional SHA-1 hasher to update 

2513 compression_level: the zlib compression level 

2514 Returns: CRC32 checksum of the written object 

2515 """ 

2516 crc32 = 0 

2517 for chunk in pack_object_chunks(type, object, compression_level=compression_level): 

2518 write(chunk) 

2519 if sha is not None: 

2520 sha.update(chunk) 

2521 crc32 = binascii.crc32(chunk, crc32) 

2522 return crc32 & 0xFFFFFFFF 

2523 

2524 

2525def write_pack( 

2526 filename, 

2527 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2528 *, 

2529 deltify: Optional[bool] = None, 

2530 delta_window_size: Optional[int] = None, 

2531 compression_level: int = -1, 

2532): 

2533 """Write a new pack data file. 

2534 

2535 Args: 

2536 filename: Path to the new pack file (without .pack extension) 

2537 objects: Objects to write to the pack 

2538 delta_window_size: Delta window size 

2539 deltify: Whether to deltify pack objects 

2540 compression_level: the zlib compression level 

2541 Returns: Tuple with checksum of pack file and index file 

2542 """ 

2543 with GitFile(filename + ".pack", "wb") as f: 

2544 entries, data_sum = write_pack_objects( 

2545 f.write, 

2546 objects, 

2547 delta_window_size=delta_window_size, 

2548 deltify=deltify, 

2549 compression_level=compression_level, 

2550 ) 

2551 entries = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

2552 with GitFile(filename + ".idx", "wb") as f: 

2553 return data_sum, write_pack_index(f, entries, data_sum) 

2554 

2555 

2556def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

2557 """Yield chunks for a pack header.""" 

2558 yield b"PACK" # Pack header 

2559 yield struct.pack(b">L", 2) # Pack version 

2560 yield struct.pack(b">L", num_objects) # Number of objects in pack 

2561 

2562 

2563def write_pack_header(write, num_objects) -> None: 

2564 """Write a pack header for the given number of objects.""" 

2565 if hasattr(write, "write"): 

2566 write = write.write 

2567 warnings.warn( 

2568 "write_pack_header() now takes a write rather than file argument", 

2569 DeprecationWarning, 

2570 stacklevel=2, 

2571 ) 

2572 for chunk in pack_header_chunks(num_objects): 

2573 write(chunk) 

2574 

2575 

2576def find_reusable_deltas( 

2577 container: PackedObjectContainer, 

2578 object_ids: set[bytes], 

2579 *, 

2580 other_haves: Optional[set[bytes]] = None, 

2581 progress=None, 

2582) -> Iterator[UnpackedObject]: 

2583 """Find deltas in a pack that can be reused. 

2584 

2585 Args: 

2586 container: Pack container to search for deltas 

2587 object_ids: Set of object IDs to find deltas for 

2588 other_haves: Set of other object IDs we have 

2589 progress: Optional progress reporting callback 

2590 

2591 Returns: 

2592 Iterator of UnpackedObject entries that can be reused 

2593 """ 

2594 if other_haves is None: 

2595 other_haves = set() 

2596 reused = 0 

2597 for i, unpacked in enumerate( 

2598 container.iter_unpacked_subset( 

2599 object_ids, allow_missing=True, convert_ofs_delta=True 

2600 ) 

2601 ): 

2602 if progress is not None and i % 1000 == 0: 

2603 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

2604 if unpacked.pack_type_num == REF_DELTA: 

2605 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

2606 if hexsha in object_ids or hexsha in other_haves: 

2607 yield unpacked 

2608 reused += 1 

2609 if progress is not None: 

2610 progress((f"found {reused} deltas to reuse\n").encode()) 

2611 

2612 

2613def deltify_pack_objects( 

2614 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[bytes]]]], 

2615 *, 

2616 window_size: Optional[int] = None, 

2617 progress=None, 

2618) -> Iterator[UnpackedObject]: 

2619 """Generate deltas for pack objects. 

2620 

2621 Args: 

2622 objects: An iterable of (object, path) tuples to deltify. 

2623 window_size: Window size; None for default 

2624 progress: Optional progress reporting callback 

2625 Returns: Iterator over type_num, object id, delta_base, content 

2626 delta_base is None for full text entries 

2627 """ 

2628 

2629 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, Optional[bytes]]]]: 

2630 for e in objects: 

2631 if isinstance(e, ShaFile): 

2632 yield (e, (e.type_num, None)) 

2633 else: 

2634 yield (e[0], (e[0].type_num, e[1])) 

2635 

2636 yield from deltas_from_sorted_objects( 

2637 sort_objects_for_delta(objects_with_hints()), 

2638 window_size=window_size, 

2639 progress=progress, 

2640 ) 

2641 

2642 

2643def sort_objects_for_delta( 

2644 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[PackHint]]]], 

2645) -> Iterator[ShaFile]: 

2646 """Sort objects for optimal delta compression. 

2647 

2648 Args: 

2649 objects: Iterator of objects or (object, hint) tuples 

2650 

2651 Returns: 

2652 Iterator of sorted ShaFile objects 

2653 """ 

2654 magic = [] 

2655 for entry in objects: 

2656 if isinstance(entry, tuple): 

2657 obj, hint = entry 

2658 if hint is None: 

2659 type_num = None 

2660 path = None 

2661 else: 

2662 (type_num, path) = hint 

2663 else: 

2664 obj = entry 

2665 magic.append((type_num, path, -obj.raw_length(), obj)) 

2666 # Build a list of objects ordered by the magic Linus heuristic 

2667 # This helps us find good objects to diff against us 

2668 magic.sort() 

2669 return (x[3] for x in magic) 

2670 

2671 

2672def deltas_from_sorted_objects( 

2673 objects, window_size: Optional[int] = None, progress=None 

2674): 

2675 """Create deltas from sorted objects. 

2676 

2677 Args: 

2678 objects: Iterator of sorted objects to deltify 

2679 window_size: Delta window size; None for default 

2680 progress: Optional progress reporting callback 

2681 

2682 Returns: 

2683 Iterator of UnpackedObject entries 

2684 """ 

2685 # TODO(jelmer): Use threads 

2686 if window_size is None: 

2687 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

2688 

2689 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

2690 for i, o in enumerate(objects): 

2691 if progress is not None and i % 1000 == 0: 

2692 progress((f"generating deltas: {i}\r").encode()) 

2693 raw = o.as_raw_chunks() 

2694 winner = raw 

2695 winner_len = sum(map(len, winner)) 

2696 winner_base = None 

2697 for base_id, base_type_num, base in possible_bases: 

2698 if base_type_num != o.type_num: 

2699 continue 

2700 delta_len = 0 

2701 delta = [] 

2702 for chunk in create_delta(b"".join(base), b"".join(raw)): 

2703 delta_len += len(chunk) 

2704 if delta_len >= winner_len: 

2705 break 

2706 delta.append(chunk) 

2707 else: 

2708 winner_base = base_id 

2709 winner = delta 

2710 winner_len = sum(map(len, winner)) 

2711 yield UnpackedObject( 

2712 o.type_num, 

2713 sha=o.sha().digest(), 

2714 delta_base=winner_base, 

2715 decomp_len=winner_len, 

2716 decomp_chunks=winner, 

2717 ) 

2718 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

2719 while len(possible_bases) > window_size: 

2720 possible_bases.pop() 

2721 

2722 

2723def pack_objects_to_data( 

2724 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2725 *, 

2726 deltify: Optional[bool] = None, 

2727 delta_window_size: Optional[int] = None, 

2728 ofs_delta: bool = True, 

2729 progress=None, 

2730) -> tuple[int, Iterator[UnpackedObject]]: 

2731 """Create pack data from objects. 

2732 

2733 Args: 

2734 objects: Pack objects 

2735 deltify: Whether to deltify pack objects 

2736 delta_window_size: Delta window size 

2737 ofs_delta: Whether to use offset deltas 

2738 progress: Optional progress reporting callback 

2739 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2740 """ 

2741 # TODO(jelmer): support deltaifying 

2742 count = len(objects) 

2743 if deltify is None: 

2744 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2745 # slow at the moment. 

2746 deltify = False 

2747 if deltify: 

2748 return ( 

2749 count, 

2750 deltify_pack_objects( 

2751 iter(objects), # type: ignore 

2752 window_size=delta_window_size, 

2753 progress=progress, 

2754 ), 

2755 ) 

2756 else: 

2757 

2758 def iter_without_path() -> Iterator[UnpackedObject]: 

2759 for o in objects: 

2760 if isinstance(o, tuple): 

2761 yield full_unpacked_object(o[0]) 

2762 else: 

2763 yield full_unpacked_object(o) 

2764 

2765 return (count, iter_without_path()) 

2766 

2767 

2768def generate_unpacked_objects( 

2769 container: PackedObjectContainer, 

2770 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2771 delta_window_size: Optional[int] = None, 

2772 deltify: Optional[bool] = None, 

2773 reuse_deltas: bool = True, 

2774 ofs_delta: bool = True, 

2775 other_haves: Optional[set[bytes]] = None, 

2776 progress=None, 

2777) -> Iterator[UnpackedObject]: 

2778 """Create pack data from objects. 

2779 

2780 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2781 """ 

2782 todo = dict(object_ids) 

2783 if reuse_deltas: 

2784 for unpack in find_reusable_deltas( 

2785 container, set(todo), other_haves=other_haves, progress=progress 

2786 ): 

2787 del todo[sha_to_hex(unpack.sha())] 

2788 yield unpack 

2789 if deltify is None: 

2790 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2791 # slow at the moment. 

2792 deltify = False 

2793 if deltify: 

2794 objects_to_delta = container.iterobjects_subset( 

2795 todo.keys(), allow_missing=False 

2796 ) 

2797 yield from deltas_from_sorted_objects( 

2798 sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta), 

2799 window_size=delta_window_size, 

2800 progress=progress, 

2801 ) 

2802 else: 

2803 for oid in todo: 

2804 yield full_unpacked_object(container[oid]) 

2805 

2806 

2807def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

2808 """Create an UnpackedObject from a ShaFile. 

2809 

2810 Args: 

2811 o: ShaFile object to convert 

2812 

2813 Returns: 

2814 UnpackedObject with full object data 

2815 """ 

2816 return UnpackedObject( 

2817 o.type_num, 

2818 delta_base=None, 

2819 crc32=None, 

2820 decomp_chunks=o.as_raw_chunks(), 

2821 sha=o.sha().digest(), 

2822 ) 

2823 

2824 

2825def write_pack_from_container( 

2826 write, 

2827 container: PackedObjectContainer, 

2828 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2829 delta_window_size: Optional[int] = None, 

2830 deltify: Optional[bool] = None, 

2831 reuse_deltas: bool = True, 

2832 compression_level: int = -1, 

2833 other_haves: Optional[set[bytes]] = None, 

2834): 

2835 """Write a new pack data file. 

2836 

2837 Args: 

2838 write: write function to use 

2839 container: PackedObjectContainer 

2840 object_ids: Sequence of (object_id, hint) tuples to write 

2841 delta_window_size: Sliding window size for searching for deltas; 

2842 Set to None for default window size. 

2843 deltify: Whether to deltify objects 

2844 reuse_deltas: Whether to reuse existing deltas 

2845 compression_level: the zlib compression level to use 

2846 other_haves: Set of additional object IDs the receiver has 

2847 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2848 """ 

2849 pack_contents_count = len(object_ids) 

2850 pack_contents = generate_unpacked_objects( 

2851 container, 

2852 object_ids, 

2853 delta_window_size=delta_window_size, 

2854 deltify=deltify, 

2855 reuse_deltas=reuse_deltas, 

2856 other_haves=other_haves, 

2857 ) 

2858 

2859 return write_pack_data( 

2860 write, 

2861 pack_contents, 

2862 num_records=pack_contents_count, 

2863 compression_level=compression_level, 

2864 ) 

2865 

2866 

2867def write_pack_objects( 

2868 write, 

2869 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2870 *, 

2871 delta_window_size: Optional[int] = None, 

2872 deltify: Optional[bool] = None, 

2873 compression_level: int = -1, 

2874): 

2875 """Write a new pack data file. 

2876 

2877 Args: 

2878 write: write function to use 

2879 objects: Sequence of (object, path) tuples to write 

2880 delta_window_size: Sliding window size for searching for deltas; 

2881 Set to None for default window size. 

2882 deltify: Whether to deltify objects 

2883 compression_level: the zlib compression level to use 

2884 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2885 """ 

2886 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

2887 

2888 return write_pack_data( 

2889 write, 

2890 pack_contents, 

2891 num_records=pack_contents_count, 

2892 compression_level=compression_level, 

2893 ) 

2894 

2895 

2896class PackChunkGenerator: 

2897 """Generator for pack data chunks.""" 

2898 

2899 def __init__( 

2900 self, 

2901 num_records=None, 

2902 records=None, 

2903 progress=None, 

2904 compression_level=-1, 

2905 reuse_compressed=True, 

2906 ) -> None: 

2907 """Initialize PackChunkGenerator. 

2908 

2909 Args: 

2910 num_records: Expected number of records 

2911 records: Iterator of pack records 

2912 progress: Optional progress callback 

2913 compression_level: Compression level (-1 for default) 

2914 reuse_compressed: Whether to reuse compressed chunks 

2915 """ 

2916 self.cs = sha1(b"") 

2917 self.entries: dict[Union[int, bytes], tuple[int, int]] = {} 

2918 self._it = self._pack_data_chunks( 

2919 num_records=num_records, 

2920 records=records, 

2921 progress=progress, 

2922 compression_level=compression_level, 

2923 reuse_compressed=reuse_compressed, 

2924 ) 

2925 

2926 def sha1digest(self) -> bytes: 

2927 """Return the SHA1 digest of the pack data.""" 

2928 return self.cs.digest() 

2929 

2930 def __iter__(self) -> Iterator[bytes]: 

2931 """Iterate over pack data chunks.""" 

2932 return self._it 

2933 

2934 def _pack_data_chunks( 

2935 self, 

2936 records: Iterator[UnpackedObject], 

2937 *, 

2938 num_records=None, 

2939 progress=None, 

2940 compression_level: int = -1, 

2941 reuse_compressed: bool = True, 

2942 ) -> Iterator[bytes]: 

2943 """Iterate pack data file chunks. 

2944 

2945 Args: 

2946 records: Iterator over UnpackedObject 

2947 num_records: Number of records (defaults to len(records) if not specified) 

2948 progress: Function to report progress to 

2949 compression_level: the zlib compression level 

2950 reuse_compressed: Whether to reuse compressed chunks 

2951 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2952 """ 

2953 # Write the pack 

2954 if num_records is None: 

2955 num_records = len(records) # type: ignore 

2956 offset = 0 

2957 for chunk in pack_header_chunks(num_records): 

2958 yield chunk 

2959 self.cs.update(chunk) 

2960 offset += len(chunk) 

2961 actual_num_records = 0 

2962 for i, unpacked in enumerate(records): 

2963 type_num = unpacked.pack_type_num 

2964 if progress is not None and i % 1000 == 0: 

2965 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

2966 raw: Union[list[bytes], tuple[int, list[bytes]], tuple[bytes, list[bytes]]] 

2967 if unpacked.delta_base is not None: 

2968 try: 

2969 base_offset, base_crc32 = self.entries[unpacked.delta_base] 

2970 except KeyError: 

2971 type_num = REF_DELTA 

2972 assert isinstance(unpacked.delta_base, bytes) 

2973 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

2974 else: 

2975 type_num = OFS_DELTA 

2976 raw = (offset - base_offset, unpacked.decomp_chunks) 

2977 else: 

2978 raw = unpacked.decomp_chunks 

2979 chunks: Union[list[bytes], Iterator[bytes]] 

2980 if unpacked.comp_chunks is not None and reuse_compressed: 

2981 chunks = unpacked.comp_chunks 

2982 else: 

2983 chunks = pack_object_chunks( 

2984 type_num, raw, compression_level=compression_level 

2985 ) 

2986 crc32 = 0 

2987 object_size = 0 

2988 for chunk in chunks: 

2989 yield chunk 

2990 crc32 = binascii.crc32(chunk, crc32) 

2991 self.cs.update(chunk) 

2992 object_size += len(chunk) 

2993 actual_num_records += 1 

2994 self.entries[unpacked.sha()] = (offset, crc32) 

2995 offset += object_size 

2996 if actual_num_records != num_records: 

2997 raise AssertionError( 

2998 f"actual records written differs: {actual_num_records} != {num_records}" 

2999 ) 

3000 

3001 yield self.cs.digest() 

3002 

3003 

3004def write_pack_data( 

3005 write, 

3006 records: Iterator[UnpackedObject], 

3007 *, 

3008 num_records=None, 

3009 progress=None, 

3010 compression_level=-1, 

3011): 

3012 """Write a new pack data file. 

3013 

3014 Args: 

3015 write: Write function to use 

3016 num_records: Number of records (defaults to len(records) if None) 

3017 records: Iterator over type_num, object_id, delta_base, raw 

3018 progress: Function to report progress to 

3019 compression_level: the zlib compression level 

3020 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3021 """ 

3022 chunk_generator = PackChunkGenerator( 

3023 num_records=num_records, 

3024 records=records, 

3025 progress=progress, 

3026 compression_level=compression_level, 

3027 ) 

3028 for chunk in chunk_generator: 

3029 write(chunk) 

3030 return chunk_generator.entries, chunk_generator.sha1digest() 

3031 

3032 

3033def write_pack_index_v1( 

3034 f: BinaryIO, entries: list[tuple[bytes, int, Optional[int]]], pack_checksum: bytes 

3035) -> bytes: 

3036 """Write a new pack index file. 

3037 

3038 Args: 

3039 f: A file-like object to write to 

3040 entries: List of tuples with object name (sha), offset_in_pack, 

3041 and crc32_checksum. 

3042 pack_checksum: Checksum of the pack file. 

3043 Returns: The SHA of the written index file 

3044 """ 

3045 f = SHA1Writer(f) 

3046 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3047 for name, _offset, _entry_checksum in entries: 

3048 fan_out_table[ord(name[:1])] += 1 

3049 # Fan-out table 

3050 for i in range(0x100): 

3051 f.write(struct.pack(">L", fan_out_table[i])) 

3052 fan_out_table[i + 1] += fan_out_table[i] 

3053 for name, offset, _entry_checksum in entries: 

3054 if not (offset <= 0xFFFFFFFF): 

3055 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3056 f.write(struct.pack(">L20s", offset, name)) 

3057 assert len(pack_checksum) == 20 

3058 f.write(pack_checksum) 

3059 return f.write_sha() 

3060 

3061 

3062def _delta_encode_size(size) -> bytes: 

3063 ret = bytearray() 

3064 c = size & 0x7F 

3065 size >>= 7 

3066 while size: 

3067 ret.append(c | 0x80) 

3068 c = size & 0x7F 

3069 size >>= 7 

3070 ret.append(c) 

3071 return bytes(ret) 

3072 

3073 

3074# The length of delta compression copy operations in version 2 packs is limited 

3075# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3076# 24-bit lengths in copy operations, but we always make version 2 packs. 

3077_MAX_COPY_LEN = 0xFFFF 

3078 

3079 

3080def _encode_copy_operation(start: int, length: int) -> bytes: 

3081 scratch = bytearray([0x80]) 

3082 for i in range(4): 

3083 if start & 0xFF << i * 8: 

3084 scratch.append((start >> i * 8) & 0xFF) 

3085 scratch[0] |= 1 << i 

3086 for i in range(2): 

3087 if length & 0xFF << i * 8: 

3088 scratch.append((length >> i * 8) & 0xFF) 

3089 scratch[0] |= 1 << (4 + i) 

3090 return bytes(scratch) 

3091 

3092 

3093def create_delta(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3094 """Use python difflib to work out how to transform base_buf to target_buf. 

3095 

3096 Args: 

3097 base_buf: Base buffer 

3098 target_buf: Target buffer 

3099 """ 

3100 if isinstance(base_buf, list): 

3101 base_buf = b"".join(base_buf) 

3102 if isinstance(target_buf, list): 

3103 target_buf = b"".join(target_buf) 

3104 assert isinstance(base_buf, bytes) 

3105 assert isinstance(target_buf, bytes) 

3106 # write delta header 

3107 yield _delta_encode_size(len(base_buf)) 

3108 yield _delta_encode_size(len(target_buf)) 

3109 # write out delta opcodes 

3110 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3111 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3112 # Git patch opcodes don't care about deletes! 

3113 # if opcode == 'replace' or opcode == 'delete': 

3114 # pass 

3115 if opcode == "equal": 

3116 # If they are equal, unpacker will use data from base_buf 

3117 # Write out an opcode that says what range to use 

3118 copy_start = i1 

3119 copy_len = i2 - i1 

3120 while copy_len > 0: 

3121 to_copy = min(copy_len, _MAX_COPY_LEN) 

3122 yield _encode_copy_operation(copy_start, to_copy) 

3123 copy_start += to_copy 

3124 copy_len -= to_copy 

3125 if opcode == "replace" or opcode == "insert": 

3126 # If we are replacing a range or adding one, then we just 

3127 # output it to the stream (prefixed by its size) 

3128 s = j2 - j1 

3129 o = j1 

3130 while s > 127: 

3131 yield bytes([127]) 

3132 yield memoryview(target_buf)[o : o + 127] 

3133 s -= 127 

3134 o += 127 

3135 yield bytes([s]) 

3136 yield memoryview(target_buf)[o : o + s] 

3137 

3138 

3139def apply_delta( 

3140 src_buf: Union[bytes, list[bytes]], delta: Union[bytes, list[bytes]] 

3141) -> list[bytes]: 

3142 """Based on the similar function in git's patch-delta.c. 

3143 

3144 Args: 

3145 src_buf: Source buffer 

3146 delta: Delta instructions 

3147 """ 

3148 if not isinstance(src_buf, bytes): 

3149 src_buf = b"".join(src_buf) 

3150 if not isinstance(delta, bytes): 

3151 delta = b"".join(delta) 

3152 out = [] 

3153 index = 0 

3154 delta_length = len(delta) 

3155 

3156 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3157 size = 0 

3158 i = 0 

3159 while delta: 

3160 cmd = ord(delta[index : index + 1]) 

3161 index += 1 

3162 size |= (cmd & ~0x80) << i 

3163 i += 7 

3164 if not cmd & 0x80: 

3165 break 

3166 return size, index 

3167 

3168 src_size, index = get_delta_header_size(delta, index) 

3169 dest_size, index = get_delta_header_size(delta, index) 

3170 if src_size != len(src_buf): 

3171 raise ApplyDeltaError( 

3172 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3173 ) 

3174 while index < delta_length: 

3175 cmd = ord(delta[index : index + 1]) 

3176 index += 1 

3177 if cmd & 0x80: 

3178 cp_off = 0 

3179 for i in range(4): 

3180 if cmd & (1 << i): 

3181 x = ord(delta[index : index + 1]) 

3182 index += 1 

3183 cp_off |= x << (i * 8) 

3184 cp_size = 0 

3185 # Version 3 packs can contain copy sizes larger than 64K. 

3186 for i in range(3): 

3187 if cmd & (1 << (4 + i)): 

3188 x = ord(delta[index : index + 1]) 

3189 index += 1 

3190 cp_size |= x << (i * 8) 

3191 if cp_size == 0: 

3192 cp_size = 0x10000 

3193 if ( 

3194 cp_off + cp_size < cp_size 

3195 or cp_off + cp_size > src_size 

3196 or cp_size > dest_size 

3197 ): 

3198 break 

3199 out.append(src_buf[cp_off : cp_off + cp_size]) 

3200 elif cmd != 0: 

3201 out.append(delta[index : index + cmd]) 

3202 index += cmd 

3203 else: 

3204 raise ApplyDeltaError("Invalid opcode 0") 

3205 

3206 if index != delta_length: 

3207 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3208 

3209 if dest_size != chunks_length(out): 

3210 raise ApplyDeltaError("dest size incorrect") 

3211 

3212 return out 

3213 

3214 

3215def write_pack_index_v2( 

3216 f, entries: Iterable[PackIndexEntry], pack_checksum: bytes 

3217) -> bytes: 

3218 """Write a new pack index file. 

3219 

3220 Args: 

3221 f: File-like object to write to 

3222 entries: List of tuples with object name (sha), offset_in_pack, and 

3223 crc32_checksum. 

3224 pack_checksum: Checksum of the pack file. 

3225 Returns: The SHA of the index file written 

3226 """ 

3227 f = SHA1Writer(f) 

3228 f.write(b"\377tOc") # Magic! 

3229 f.write(struct.pack(">L", 2)) 

3230 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3231 for name, offset, entry_checksum in entries: 

3232 fan_out_table[ord(name[:1])] += 1 

3233 # Fan-out table 

3234 largetable: list[int] = [] 

3235 for i in range(0x100): 

3236 f.write(struct.pack(b">L", fan_out_table[i])) 

3237 fan_out_table[i + 1] += fan_out_table[i] 

3238 for name, offset, entry_checksum in entries: 

3239 f.write(name) 

3240 for name, offset, entry_checksum in entries: 

3241 f.write(struct.pack(b">L", entry_checksum)) 

3242 for name, offset, entry_checksum in entries: 

3243 if offset < 2**31: 

3244 f.write(struct.pack(b">L", offset)) 

3245 else: 

3246 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3247 largetable.append(offset) 

3248 for offset in largetable: 

3249 f.write(struct.pack(b">Q", offset)) 

3250 assert len(pack_checksum) == 20 

3251 f.write(pack_checksum) 

3252 return f.write_sha() 

3253 

3254 

3255def write_pack_index_v3( 

3256 f, entries: Iterable[PackIndexEntry], pack_checksum: bytes, hash_algorithm: int = 1 

3257) -> bytes: 

3258 """Write a new pack index file in v3 format. 

3259 

3260 Args: 

3261 f: File-like object to write to 

3262 entries: List of tuples with object name (sha), offset_in_pack, and 

3263 crc32_checksum. 

3264 pack_checksum: Checksum of the pack file. 

3265 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3266 Returns: The SHA of the index file written 

3267 """ 

3268 if hash_algorithm == 1: 

3269 hash_size = 20 # SHA-1 

3270 writer_cls = SHA1Writer 

3271 elif hash_algorithm == 2: 

3272 hash_size = 32 # SHA-256 

3273 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3274 raise NotImplementedError("SHA-256 support not yet implemented") 

3275 else: 

3276 raise ValueError(f"Unknown hash algorithm {hash_algorithm}") 

3277 

3278 # Convert entries to list to allow multiple iterations 

3279 entries_list = list(entries) 

3280 

3281 # Calculate shortest unambiguous prefix length for object names 

3282 # For now, use full hash size (this could be optimized) 

3283 shortened_oid_len = hash_size 

3284 

3285 f = writer_cls(f) 

3286 f.write(b"\377tOc") # Magic! 

3287 f.write(struct.pack(">L", 3)) # Version 3 

3288 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm 

3289 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3290 

3291 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3292 for name, offset, entry_checksum in entries_list: 

3293 if len(name) != hash_size: 

3294 raise ValueError( 

3295 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3296 ) 

3297 fan_out_table[ord(name[:1])] += 1 

3298 

3299 # Fan-out table 

3300 largetable: list[int] = [] 

3301 for i in range(0x100): 

3302 f.write(struct.pack(b">L", fan_out_table[i])) 

3303 fan_out_table[i + 1] += fan_out_table[i] 

3304 

3305 # Object names table 

3306 for name, offset, entry_checksum in entries_list: 

3307 f.write(name) 

3308 

3309 # CRC32 checksums table 

3310 for name, offset, entry_checksum in entries_list: 

3311 f.write(struct.pack(b">L", entry_checksum)) 

3312 

3313 # Offset table 

3314 for name, offset, entry_checksum in entries_list: 

3315 if offset < 2**31: 

3316 f.write(struct.pack(b">L", offset)) 

3317 else: 

3318 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3319 largetable.append(offset) 

3320 

3321 # Large offset table 

3322 for offset in largetable: 

3323 f.write(struct.pack(b">Q", offset)) 

3324 

3325 assert len(pack_checksum) == hash_size, ( 

3326 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3327 ) 

3328 f.write(pack_checksum) 

3329 return f.write_sha() 

3330 

3331 

3332def write_pack_index( 

3333 index_filename, entries, pack_checksum, progress=None, version=None 

3334): 

3335 """Write a pack index file. 

3336 

3337 Args: 

3338 index_filename: Index filename. 

3339 entries: List of (checksum, offset, crc32) tuples 

3340 pack_checksum: Checksum of the pack file. 

3341 progress: Progress function (not currently used) 

3342 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3343 

3344 Returns: 

3345 SHA of the written index file 

3346 """ 

3347 if version is None: 

3348 version = DEFAULT_PACK_INDEX_VERSION 

3349 

3350 if version == 1: 

3351 return write_pack_index_v1(index_filename, entries, pack_checksum) 

3352 elif version == 2: 

3353 return write_pack_index_v2(index_filename, entries, pack_checksum) 

3354 elif version == 3: 

3355 return write_pack_index_v3(index_filename, entries, pack_checksum) 

3356 else: 

3357 raise ValueError(f"Unsupported pack index version: {version}") 

3358 

3359 

3360class Pack: 

3361 """A Git pack object.""" 

3362 

3363 _data_load: Optional[Callable[[], PackData]] 

3364 _idx_load: Optional[Callable[[], PackIndex]] 

3365 

3366 _data: Optional[PackData] 

3367 _idx: Optional[PackIndex] 

3368 

3369 def __init__( 

3370 self, 

3371 basename, 

3372 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

3373 *, 

3374 delta_window_size=None, 

3375 window_memory=None, 

3376 delta_cache_size=None, 

3377 depth=None, 

3378 threads=None, 

3379 big_file_threshold=None, 

3380 ) -> None: 

3381 """Initialize a Pack object. 

3382 

3383 Args: 

3384 basename: Base path for pack files (without .pack/.idx extension) 

3385 resolve_ext_ref: Optional function to resolve external references 

3386 delta_window_size: Size of the delta compression window 

3387 window_memory: Memory limit for delta compression window 

3388 delta_cache_size: Size of the delta cache 

3389 depth: Maximum depth for delta chains 

3390 threads: Number of threads to use for operations 

3391 big_file_threshold: Size threshold for big file handling 

3392 """ 

3393 self._basename = basename 

3394 self._data = None 

3395 self._idx = None 

3396 self._idx_path = self._basename + ".idx" 

3397 self._data_path = self._basename + ".pack" 

3398 self.delta_window_size = delta_window_size 

3399 self.window_memory = window_memory 

3400 self.delta_cache_size = delta_cache_size 

3401 self.depth = depth 

3402 self.threads = threads 

3403 self.big_file_threshold = big_file_threshold 

3404 self._data_load = lambda: PackData( 

3405 self._data_path, 

3406 delta_window_size=delta_window_size, 

3407 window_memory=window_memory, 

3408 delta_cache_size=delta_cache_size, 

3409 depth=depth, 

3410 threads=threads, 

3411 big_file_threshold=big_file_threshold, 

3412 ) 

3413 self._idx_load = lambda: load_pack_index(self._idx_path) 

3414 self.resolve_ext_ref = resolve_ext_ref 

3415 

3416 @classmethod 

3417 def from_lazy_objects(cls, data_fn: Callable, idx_fn: Callable) -> "Pack": 

3418 """Create a new pack object from callables to load pack data and index objects.""" 

3419 ret = cls("") 

3420 ret._data_load = data_fn 

3421 ret._idx_load = idx_fn 

3422 return ret 

3423 

3424 @classmethod 

3425 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

3426 """Create a new pack object from pack data and index objects.""" 

3427 ret = cls("") 

3428 ret._data = data 

3429 ret._data_load = None 

3430 ret._idx = idx 

3431 ret._idx_load = None 

3432 ret.check_length_and_checksum() 

3433 return ret 

3434 

3435 def name(self) -> bytes: 

3436 """The SHA over the SHAs of the objects in this pack.""" 

3437 return self.index.objects_sha1() 

3438 

3439 @property 

3440 def data(self) -> PackData: 

3441 """The pack data object being used.""" 

3442 if self._data is None: 

3443 assert self._data_load 

3444 self._data = self._data_load() 

3445 self.check_length_and_checksum() 

3446 return self._data 

3447 

3448 @property 

3449 def index(self) -> PackIndex: 

3450 """The index being used. 

3451 

3452 Note: This may be an in-memory index 

3453 """ 

3454 if self._idx is None: 

3455 assert self._idx_load 

3456 self._idx = self._idx_load() 

3457 return self._idx 

3458 

3459 def close(self) -> None: 

3460 """Close the pack file and index.""" 

3461 if self._data is not None: 

3462 self._data.close() 

3463 if self._idx is not None: 

3464 self._idx.close() 

3465 

3466 def __enter__(self) -> "Pack": 

3467 """Enter context manager.""" 

3468 return self 

3469 

3470 def __exit__( 

3471 self, 

3472 exc_type: Optional[type], 

3473 exc_val: Optional[BaseException], 

3474 exc_tb: Optional[TracebackType], 

3475 ) -> None: 

3476 """Exit context manager.""" 

3477 self.close() 

3478 

3479 def __eq__(self, other: object) -> bool: 

3480 """Check equality with another pack.""" 

3481 if not isinstance(other, Pack): 

3482 return False 

3483 return self.index == other.index 

3484 

3485 def __len__(self) -> int: 

3486 """Number of entries in this pack.""" 

3487 return len(self.index) 

3488 

3489 def __repr__(self) -> str: 

3490 """Return string representation of this pack.""" 

3491 return f"{self.__class__.__name__}({self._basename!r})" 

3492 

3493 def __iter__(self) -> Iterator[bytes]: 

3494 """Iterate over all the sha1s of the objects in this pack.""" 

3495 return iter(self.index) 

3496 

3497 def check_length_and_checksum(self) -> None: 

3498 """Sanity check the length and checksum of the pack index and data.""" 

3499 assert len(self.index) == len(self.data), ( 

3500 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

3501 ) 

3502 idx_stored_checksum = self.index.get_pack_checksum() 

3503 data_stored_checksum = self.data.get_stored_checksum() 

3504 if ( 

3505 idx_stored_checksum is not None 

3506 and idx_stored_checksum != data_stored_checksum 

3507 ): 

3508 raise ChecksumMismatch( 

3509 sha_to_hex(idx_stored_checksum), 

3510 sha_to_hex(data_stored_checksum), 

3511 ) 

3512 

3513 def check(self) -> None: 

3514 """Check the integrity of this pack. 

3515 

3516 Raises: 

3517 ChecksumMismatch: if a checksum for the index or data is wrong 

3518 """ 

3519 self.index.check() 

3520 self.data.check() 

3521 for obj in self.iterobjects(): 

3522 obj.check() 

3523 # TODO: object connectivity checks 

3524 

3525 def get_stored_checksum(self) -> bytes: 

3526 """Return the stored checksum of the pack data.""" 

3527 return self.data.get_stored_checksum() 

3528 

3529 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

3530 """Return pack tuples for all objects in pack.""" 

3531 return [(o, None) for o in self.iterobjects()] 

3532 

3533 def __contains__(self, sha1: bytes) -> bool: 

3534 """Check whether this pack contains a particular SHA1.""" 

3535 try: 

3536 self.index.object_offset(sha1) 

3537 return True 

3538 except KeyError: 

3539 return False 

3540 

3541 def get_raw(self, sha1: bytes) -> tuple[int, bytes]: 

3542 """Get raw object data by SHA1.""" 

3543 offset = self.index.object_offset(sha1) 

3544 obj_type, obj = self.data.get_object_at(offset) 

3545 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

3546 return type_num, b"".join(chunks) 

3547 

3548 def __getitem__(self, sha1: bytes) -> ShaFile: 

3549 """Retrieve the specified SHA1.""" 

3550 type, uncomp = self.get_raw(sha1) 

3551 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

3552 

3553 def iterobjects(self) -> Iterator[ShaFile]: 

3554 """Iterate over the objects in this pack.""" 

3555 return iter( 

3556 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

3557 ) 

3558 

3559 def iterobjects_subset( 

3560 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

3561 ) -> Iterator[ShaFile]: 

3562 """Iterate over a subset of objects in this pack.""" 

3563 return ( 

3564 uo 

3565 for uo in PackInflater.for_pack_subset( 

3566 self, 

3567 shas, 

3568 allow_missing=allow_missing, 

3569 resolve_ext_ref=self.resolve_ext_ref, 

3570 ) 

3571 if uo.id in shas 

3572 ) 

3573 

3574 def iter_unpacked_subset( 

3575 self, 

3576 shas: Iterable[ObjectID], 

3577 *, 

3578 include_comp: bool = False, 

3579 allow_missing: bool = False, 

3580 convert_ofs_delta: bool = False, 

3581 ) -> Iterator[UnpackedObject]: 

3582 """Iterate over unpacked objects in subset.""" 

3583 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

3584 ofs: dict[int, bytes] = {} 

3585 todo = set(shas) 

3586 for unpacked in self.iter_unpacked(include_comp=include_comp): 

3587 sha = unpacked.sha() 

3588 if unpacked.offset is not None: 

3589 ofs[unpacked.offset] = sha 

3590 hexsha = sha_to_hex(sha) 

3591 if hexsha in todo: 

3592 if unpacked.pack_type_num == OFS_DELTA: 

3593 assert isinstance(unpacked.delta_base, int) 

3594 assert unpacked.offset is not None 

3595 base_offset = unpacked.offset - unpacked.delta_base 

3596 try: 

3597 unpacked.delta_base = ofs[base_offset] 

3598 except KeyError: 

3599 ofs_pending[base_offset].append(unpacked) 

3600 continue 

3601 else: 

3602 unpacked.pack_type_num = REF_DELTA 

3603 yield unpacked 

3604 todo.remove(hexsha) 

3605 if unpacked.offset is not None: 

3606 for child in ofs_pending.pop(unpacked.offset, []): 

3607 child.pack_type_num = REF_DELTA 

3608 child.delta_base = sha 

3609 yield child 

3610 assert not ofs_pending 

3611 if not allow_missing and todo: 

3612 raise UnresolvedDeltas(list(todo)) 

3613 

3614 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

3615 """Iterate over all unpacked objects in this pack.""" 

3616 ofs_to_entries = { 

3617 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

3618 } 

3619 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

3620 assert unpacked.offset is not None 

3621 (sha, crc32) = ofs_to_entries[unpacked.offset] 

3622 unpacked._sha = sha 

3623 unpacked.crc32 = crc32 

3624 yield unpacked 

3625 

3626 def keep(self, msg: Optional[bytes] = None) -> str: 

3627 """Add a .keep file for the pack, preventing git from garbage collecting it. 

3628 

3629 Args: 

3630 msg: A message written inside the .keep file; can be used later 

3631 to determine whether or not a .keep file is obsolete. 

3632 Returns: The path of the .keep file, as a string. 

3633 """ 

3634 keepfile_name = f"{self._basename}.keep" 

3635 with GitFile(keepfile_name, "wb") as keepfile: 

3636 if msg: 

3637 keepfile.write(msg) 

3638 keepfile.write(b"\n") 

3639 return keepfile_name 

3640 

3641 def get_ref(self, sha: bytes) -> tuple[Optional[int], int, OldUnpackedObject]: 

3642 """Get the object for a ref SHA, only looking in this pack.""" 

3643 # TODO: cache these results 

3644 try: 

3645 offset = self.index.object_offset(sha) 

3646 except KeyError: 

3647 offset = None 

3648 if offset: 

3649 type, obj = self.data.get_object_at(offset) 

3650 elif self.resolve_ext_ref: 

3651 type, obj = self.resolve_ext_ref(sha) 

3652 else: 

3653 raise KeyError(sha) 

3654 return offset, type, obj 

3655 

3656 def resolve_object( 

3657 self, offset: int, type: int, obj, get_ref=None 

3658 ) -> tuple[int, Iterable[bytes]]: 

3659 """Resolve an object, possibly resolving deltas when necessary. 

3660 

3661 Returns: Tuple with object type and contents. 

3662 """ 

3663 # Walk down the delta chain, building a stack of deltas to reach 

3664 # the requested object. 

3665 base_offset = offset 

3666 base_type = type 

3667 base_obj = obj 

3668 delta_stack = [] 

3669 while base_type in DELTA_TYPES: 

3670 prev_offset = base_offset 

3671 if get_ref is None: 

3672 get_ref = self.get_ref 

3673 if base_type == OFS_DELTA: 

3674 (delta_offset, delta) = base_obj 

3675 # TODO: clean up asserts and replace with nicer error messages 

3676 base_offset = base_offset - delta_offset 

3677 base_type, base_obj = self.data.get_object_at(base_offset) 

3678 assert isinstance(base_type, int) 

3679 elif base_type == REF_DELTA: 

3680 (basename, delta) = base_obj 

3681 assert isinstance(basename, bytes) and len(basename) == 20 

3682 base_offset, base_type, base_obj = get_ref(basename) 

3683 assert isinstance(base_type, int) 

3684 if base_offset == prev_offset: # object is based on itself 

3685 raise UnresolvedDeltas([basename]) 

3686 delta_stack.append((prev_offset, base_type, delta)) 

3687 

3688 # Now grab the base object (mustn't be a delta) and apply the 

3689 # deltas all the way up the stack. 

3690 chunks = base_obj 

3691 for prev_offset, _delta_type, delta in reversed(delta_stack): 

3692 # Convert chunks to bytes for apply_delta if needed 

3693 if isinstance(chunks, list): 

3694 chunks_bytes = b"".join(chunks) 

3695 elif isinstance(chunks, tuple): 

3696 # For tuple type, second element is the actual data 

3697 _, chunk_data = chunks 

3698 if isinstance(chunk_data, list): 

3699 chunks_bytes = b"".join(chunk_data) 

3700 else: 

3701 chunks_bytes = chunk_data 

3702 else: 

3703 chunks_bytes = chunks 

3704 

3705 # Apply delta and get result as list 

3706 chunks = apply_delta(chunks_bytes, delta) 

3707 

3708 if prev_offset is not None: 

3709 self.data._offset_cache[prev_offset] = base_type, chunks 

3710 return base_type, chunks 

3711 

3712 def entries( 

3713 self, progress: Optional[ProgressFn] = None 

3714 ) -> Iterator[PackIndexEntry]: 

3715 """Yield entries summarizing the contents of this pack. 

3716 

3717 Args: 

3718 progress: Progress function, called with current and total 

3719 object count. 

3720 Returns: iterator of tuples with (sha, offset, crc32) 

3721 """ 

3722 return self.data.iterentries( 

3723 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3724 ) 

3725 

3726 def sorted_entries( 

3727 self, progress: Optional[ProgressFn] = None 

3728 ) -> Iterator[PackIndexEntry]: 

3729 """Return entries in this pack, sorted by SHA. 

3730 

3731 Args: 

3732 progress: Progress function, called with current and total 

3733 object count 

3734 Returns: Iterator of tuples with (sha, offset, crc32) 

3735 """ 

3736 return iter( 

3737 self.data.sorted_entries( 

3738 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3739 ) 

3740 ) 

3741 

3742 def get_unpacked_object( 

3743 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True 

3744 ) -> UnpackedObject: 

3745 """Get the unpacked object for a sha. 

3746 

3747 Args: 

3748 sha: SHA of object to fetch 

3749 include_comp: Whether to include compression data in UnpackedObject 

3750 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

3751 """ 

3752 offset = self.index.object_offset(sha) 

3753 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

3754 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

3755 assert isinstance(unpacked.delta_base, int) 

3756 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

3757 unpacked.pack_type_num = REF_DELTA 

3758 return unpacked 

3759 

3760 

3761def extend_pack( 

3762 f: BinaryIO, 

3763 object_ids: set[ObjectID], 

3764 get_raw, 

3765 *, 

3766 compression_level=-1, 

3767 progress=None, 

3768) -> tuple[bytes, list]: 

3769 """Extend a pack file with more objects. 

3770 

3771 The caller should make sure that object_ids does not contain any objects 

3772 that are already in the pack 

3773 """ 

3774 # Update the header with the new number of objects. 

3775 f.seek(0) 

3776 _version, num_objects = read_pack_header(f.read) 

3777 

3778 if object_ids: 

3779 f.seek(0) 

3780 write_pack_header(f.write, num_objects + len(object_ids)) 

3781 

3782 # Must flush before reading (http://bugs.python.org/issue3207) 

3783 f.flush() 

3784 

3785 # Rescan the rest of the pack, computing the SHA with the new header. 

3786 new_sha = compute_file_sha(f, end_ofs=-20) 

3787 

3788 # Must reposition before writing (http://bugs.python.org/issue3207) 

3789 f.seek(0, os.SEEK_CUR) 

3790 

3791 extra_entries = [] 

3792 

3793 # Complete the pack. 

3794 for i, object_id in enumerate(object_ids): 

3795 if progress is not None: 

3796 progress( 

3797 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

3798 ) 

3799 assert len(object_id) == 20 

3800 type_num, data = get_raw(object_id) 

3801 offset = f.tell() 

3802 crc32 = write_pack_object( 

3803 f.write, 

3804 type_num, 

3805 data, 

3806 sha=new_sha, 

3807 compression_level=compression_level, 

3808 ) 

3809 extra_entries.append((object_id, offset, crc32)) 

3810 pack_sha = new_sha.digest() 

3811 f.write(pack_sha) 

3812 return pack_sha, extra_entries 

3813 

3814 

3815try: 

3816 from dulwich._pack import ( # type: ignore 

3817 apply_delta, # type: ignore 

3818 bisect_find_sha, # type: ignore 

3819 ) 

3820except ImportError: 

3821 pass