Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1608 statements  

1# pack.py -- For dealing with packed git objects. 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Classes for dealing with packed git objects. 

24 

25A pack is a compact representation of a bunch of objects, stored 

26using deltas where possible. 

27 

28They have two parts, the pack file, which stores the data, and an index 

29that tells you where the data is. 

30 

31To find an object you look in all of the index files 'til you find a 

32match for the object name. You then use the pointer got from this as 

33a pointer in to the corresponding packfile. 

34""" 

35 

36import binascii 

37from collections import defaultdict, deque 

38from contextlib import suppress 

39from io import BytesIO, UnsupportedOperation 

40 

41try: 

42 from cdifflib import CSequenceMatcher as SequenceMatcher 

43except ModuleNotFoundError: 

44 from difflib import SequenceMatcher 

45 

46import os 

47import struct 

48import sys 

49import warnings 

50import zlib 

51from collections.abc import Iterable, Iterator, Sequence 

52from hashlib import sha1 

53from itertools import chain 

54from os import SEEK_CUR, SEEK_END 

55from struct import unpack_from 

56from types import TracebackType 

57from typing import ( 

58 IO, 

59 TYPE_CHECKING, 

60 Any, 

61 BinaryIO, 

62 Callable, 

63 Generic, 

64 Optional, 

65 Protocol, 

66 TypeVar, 

67 Union, 

68 cast, 

69) 

70 

71try: 

72 import mmap 

73except ImportError: 

74 has_mmap = False 

75else: 

76 has_mmap = True 

77 

78if TYPE_CHECKING: 

79 from _hashlib import HASH as HashObject 

80 

81 from .commit_graph import CommitGraph 

82 

83# For some reason the above try, except fails to set has_mmap = False for plan9 

84if sys.platform == "Plan9": 

85 has_mmap = False 

86 

87from . import replace_me 

88from .errors import ApplyDeltaError, ChecksumMismatch 

89from .file import GitFile, _GitFile 

90from .lru_cache import LRUSizeCache 

91from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex 

92 

93OFS_DELTA = 6 

94REF_DELTA = 7 

95 

96DELTA_TYPES = (OFS_DELTA, REF_DELTA) 

97 

98 

99DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 

100 

101# Keep pack files under 16Mb in memory, otherwise write them out to disk 

102PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 

103 

104# Default pack index version to use when none is specified 

105DEFAULT_PACK_INDEX_VERSION = 2 

106 

107 

108OldUnpackedObject = Union[tuple[Union[bytes, int], list[bytes]], list[bytes]] 

109ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]] 

110ProgressFn = Callable[[int, str], None] 

111PackHint = tuple[int, Optional[bytes]] 

112 

113 

114class UnresolvedDeltas(Exception): 

115 """Delta objects could not be resolved.""" 

116 

117 def __init__(self, shas: list[bytes]) -> None: 

118 """Initialize UnresolvedDeltas exception. 

119 

120 Args: 

121 shas: List of SHA hashes for unresolved delta objects 

122 """ 

123 self.shas = shas 

124 

125 

126class ObjectContainer(Protocol): 

127 """Protocol for objects that can contain git objects.""" 

128 

129 def add_object(self, obj: ShaFile) -> None: 

130 """Add a single object to this object store.""" 

131 

132 def add_objects( 

133 self, 

134 objects: Sequence[tuple[ShaFile, Optional[str]]], 

135 progress: Optional[Callable[[str], None]] = None, 

136 ) -> Optional["Pack"]: 

137 """Add a set of objects to this object store. 

138 

139 Args: 

140 objects: Iterable over a list of (object, path) tuples 

141 progress: Progress callback for object insertion 

142 Returns: Optional Pack object of the objects written. 

143 """ 

144 

145 def __contains__(self, sha1: bytes) -> bool: 

146 """Check if a hex sha is present.""" 

147 

148 def __getitem__(self, sha1: bytes) -> ShaFile: 

149 """Retrieve an object.""" 

150 

151 def get_commit_graph(self) -> Optional["CommitGraph"]: 

152 """Get the commit graph for this object store. 

153 

154 Returns: 

155 CommitGraph object if available, None otherwise 

156 """ 

157 return None 

158 

159 

160class PackedObjectContainer(ObjectContainer): 

161 """Container for objects packed in a pack file.""" 

162 

163 def get_unpacked_object( 

164 self, sha1: bytes, *, include_comp: bool = False 

165 ) -> "UnpackedObject": 

166 """Get a raw unresolved object. 

167 

168 Args: 

169 sha1: SHA-1 hash of the object 

170 include_comp: Whether to include compressed data 

171 

172 Returns: 

173 UnpackedObject instance 

174 """ 

175 raise NotImplementedError(self.get_unpacked_object) 

176 

177 def iterobjects_subset( 

178 self, shas: Iterable[bytes], *, allow_missing: bool = False 

179 ) -> Iterator[ShaFile]: 

180 """Iterate over a subset of objects. 

181 

182 Args: 

183 shas: Iterable of object SHAs to retrieve 

184 allow_missing: If True, skip missing objects 

185 

186 Returns: 

187 Iterator of ShaFile objects 

188 """ 

189 raise NotImplementedError(self.iterobjects_subset) 

190 

191 def iter_unpacked_subset( 

192 self, 

193 shas: set[bytes], 

194 include_comp: bool = False, 

195 allow_missing: bool = False, 

196 convert_ofs_delta: bool = True, 

197 ) -> Iterator["UnpackedObject"]: 

198 """Iterate over unpacked objects from a subset of SHAs. 

199 

200 Args: 

201 shas: Set of object SHAs to retrieve 

202 include_comp: Include compressed data if True 

203 allow_missing: If True, skip missing objects 

204 convert_ofs_delta: If True, convert offset deltas to ref deltas 

205 

206 Returns: 

207 Iterator of UnpackedObject instances 

208 """ 

209 raise NotImplementedError(self.iter_unpacked_subset) 

210 

211 

212class UnpackedObjectStream: 

213 """Abstract base class for a stream of unpacked objects.""" 

214 

215 def __iter__(self) -> Iterator["UnpackedObject"]: 

216 """Iterate over unpacked objects.""" 

217 raise NotImplementedError(self.__iter__) 

218 

219 def __len__(self) -> int: 

220 """Return the number of objects in the stream.""" 

221 raise NotImplementedError(self.__len__) 

222 

223 

224def take_msb_bytes( 

225 read: Callable[[int], bytes], crc32: Optional[int] = None 

226) -> tuple[list[int], Optional[int]]: 

227 """Read bytes marked with most significant bit. 

228 

229 Args: 

230 read: Read function 

231 crc32: Optional CRC32 checksum to update 

232 

233 Returns: 

234 Tuple of (list of bytes read, updated CRC32 or None) 

235 """ 

236 ret: list[int] = [] 

237 while len(ret) == 0 or ret[-1] & 0x80: 

238 b = read(1) 

239 if crc32 is not None: 

240 crc32 = binascii.crc32(b, crc32) 

241 ret.append(ord(b[:1])) 

242 return ret, crc32 

243 

244 

245class PackFileDisappeared(Exception): 

246 """Raised when a pack file unexpectedly disappears.""" 

247 

248 def __init__(self, obj: object) -> None: 

249 """Initialize PackFileDisappeared exception. 

250 

251 Args: 

252 obj: The object that triggered the exception 

253 """ 

254 self.obj = obj 

255 

256 

257class UnpackedObject: 

258 """Class encapsulating an object unpacked from a pack file. 

259 

260 These objects should only be created from within unpack_object. Most 

261 members start out as empty and are filled in at various points by 

262 read_zlib_chunks, unpack_object, DeltaChainIterator, etc. 

263 

264 End users of this object should take care that the function they're getting 

265 this object from is guaranteed to set the members they need. 

266 """ 

267 

268 __slots__ = [ 

269 "_sha", # Cached binary SHA. 

270 "comp_chunks", # Compressed object chunks. 

271 "crc32", # CRC32. 

272 "decomp_chunks", # Decompressed object chunks. 

273 "decomp_len", # Decompressed length of this object. 

274 "delta_base", # Delta base offset or SHA. 

275 "obj_chunks", # Decompressed and delta-resolved chunks. 

276 "obj_type_num", # Type of this object. 

277 "offset", # Offset in its pack. 

278 "pack_type_num", # Type of this object in the pack (may be a delta). 

279 ] 

280 

281 obj_type_num: Optional[int] 

282 obj_chunks: Optional[list[bytes]] 

283 delta_base: Union[None, bytes, int] 

284 decomp_chunks: list[bytes] 

285 comp_chunks: Optional[list[bytes]] 

286 decomp_len: Optional[int] 

287 crc32: Optional[int] 

288 offset: Optional[int] 

289 pack_type_num: int 

290 _sha: Optional[bytes] 

291 

292 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be 

293 # methods of this object. 

294 def __init__( 

295 self, 

296 pack_type_num: int, 

297 *, 

298 delta_base: Union[None, bytes, int] = None, 

299 decomp_len: Optional[int] = None, 

300 crc32: Optional[int] = None, 

301 sha: Optional[bytes] = None, 

302 decomp_chunks: Optional[list[bytes]] = None, 

303 offset: Optional[int] = None, 

304 ) -> None: 

305 """Initialize an UnpackedObject. 

306 

307 Args: 

308 pack_type_num: Type number of this object in the pack 

309 delta_base: Delta base (offset or SHA) if this is a delta object 

310 decomp_len: Decompressed length of this object 

311 crc32: CRC32 checksum 

312 sha: SHA-1 hash of the object 

313 decomp_chunks: Decompressed chunks 

314 offset: Offset in the pack file 

315 """ 

316 self.offset = offset 

317 self._sha = sha 

318 self.pack_type_num = pack_type_num 

319 self.delta_base = delta_base 

320 self.comp_chunks = None 

321 self.decomp_chunks: list[bytes] = decomp_chunks or [] 

322 if decomp_chunks is not None and decomp_len is None: 

323 self.decomp_len = sum(map(len, decomp_chunks)) 

324 else: 

325 self.decomp_len = decomp_len 

326 self.crc32 = crc32 

327 

328 if pack_type_num in DELTA_TYPES: 

329 self.obj_type_num = None 

330 self.obj_chunks = None 

331 else: 

332 self.obj_type_num = pack_type_num 

333 self.obj_chunks = self.decomp_chunks 

334 self.delta_base = delta_base 

335 

336 def sha(self) -> bytes: 

337 """Return the binary SHA of this object.""" 

338 if self._sha is None: 

339 assert self.obj_type_num is not None and self.obj_chunks is not None 

340 self._sha = obj_sha(self.obj_type_num, self.obj_chunks) 

341 return self._sha 

342 

343 def sha_file(self) -> ShaFile: 

344 """Return a ShaFile from this object.""" 

345 assert self.obj_type_num is not None and self.obj_chunks is not None 

346 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks) 

347 

348 # Only provided for backwards compatibility with code that expects either 

349 # chunks or a delta tuple. 

350 def _obj(self) -> OldUnpackedObject: 

351 """Return the decompressed chunks, or (delta base, delta chunks).""" 

352 if self.pack_type_num in DELTA_TYPES: 

353 assert isinstance(self.delta_base, (bytes, int)) 

354 return (self.delta_base, self.decomp_chunks) 

355 else: 

356 return self.decomp_chunks 

357 

358 def __eq__(self, other: object) -> bool: 

359 """Check equality with another UnpackedObject.""" 

360 if not isinstance(other, UnpackedObject): 

361 return False 

362 for slot in self.__slots__: 

363 if getattr(self, slot) != getattr(other, slot): 

364 return False 

365 return True 

366 

367 def __ne__(self, other: object) -> bool: 

368 """Check inequality with another UnpackedObject.""" 

369 return not (self == other) 

370 

371 def __repr__(self) -> str: 

372 """Return string representation of this UnpackedObject.""" 

373 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__] 

374 return "{}({})".format(self.__class__.__name__, ", ".join(data)) 

375 

376 

377_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance 

378 

379 

380def read_zlib_chunks( 

381 read_some: Callable[[int], bytes], 

382 unpacked: UnpackedObject, 

383 include_comp: bool = False, 

384 buffer_size: int = _ZLIB_BUFSIZE, 

385) -> bytes: 

386 """Read zlib data from a buffer. 

387 

388 This function requires that the buffer have additional data following the 

389 compressed data, which is guaranteed to be the case for git pack files. 

390 

391 Args: 

392 read_some: Read function that returns at least one byte, but may 

393 return less than the requested size. 

394 unpacked: An UnpackedObject to write result data to. If its crc32 

395 attr is not None, the CRC32 of the compressed bytes will be computed 

396 using this starting CRC32. 

397 After this function, will have the following attrs set: 

398 * comp_chunks (if include_comp is True) 

399 * decomp_chunks 

400 * decomp_len 

401 * crc32 

402 include_comp: If True, include compressed data in the result. 

403 buffer_size: Size of the read buffer. 

404 Returns: Leftover unused data from the decompression. 

405 

406 Raises: 

407 zlib.error: if a decompression error occurred. 

408 """ 

409 if unpacked.decomp_len is None or unpacked.decomp_len <= -1: 

410 raise ValueError("non-negative zlib data stream size expected") 

411 decomp_obj = zlib.decompressobj() 

412 

413 comp_chunks = [] 

414 decomp_chunks = unpacked.decomp_chunks 

415 decomp_len = 0 

416 crc32 = unpacked.crc32 

417 

418 while True: 

419 add = read_some(buffer_size) 

420 if not add: 

421 raise zlib.error("EOF before end of zlib stream") 

422 comp_chunks.append(add) 

423 decomp = decomp_obj.decompress(add) 

424 decomp_len += len(decomp) 

425 decomp_chunks.append(decomp) 

426 unused = decomp_obj.unused_data 

427 if unused: 

428 left = len(unused) 

429 if crc32 is not None: 

430 crc32 = binascii.crc32(add[:-left], crc32) 

431 if include_comp: 

432 comp_chunks[-1] = add[:-left] 

433 break 

434 elif crc32 is not None: 

435 crc32 = binascii.crc32(add, crc32) 

436 if crc32 is not None: 

437 crc32 &= 0xFFFFFFFF 

438 

439 if decomp_len != unpacked.decomp_len: 

440 raise zlib.error("decompressed data does not match expected size") 

441 

442 unpacked.crc32 = crc32 

443 if include_comp: 

444 unpacked.comp_chunks = comp_chunks 

445 return unused 

446 

447 

448def iter_sha1(iter: Iterable[bytes]) -> bytes: 

449 """Return the hexdigest of the SHA1 over a set of names. 

450 

451 Args: 

452 iter: Iterator over string objects 

453 Returns: 40-byte hex sha1 digest 

454 """ 

455 sha = sha1() 

456 for name in iter: 

457 sha.update(name) 

458 return sha.hexdigest().encode("ascii") 

459 

460 

461def load_pack_index(path: Union[str, os.PathLike]) -> "PackIndex": 

462 """Load an index file by path. 

463 

464 Args: 

465 path: Path to the index file 

466 Returns: A PackIndex loaded from the given path 

467 """ 

468 with GitFile(path, "rb") as f: 

469 return load_pack_index_file(path, f) 

470 

471 

472def _load_file_contents( 

473 f: Union[IO[bytes], _GitFile], size: Optional[int] = None 

474) -> tuple[Union[bytes, Any], int]: 

475 """Load contents from a file, preferring mmap when possible. 

476 

477 Args: 

478 f: File-like object to load 

479 size: Expected size, or None to determine from file 

480 Returns: Tuple of (contents, size) 

481 """ 

482 try: 

483 fd = f.fileno() 

484 except (UnsupportedOperation, AttributeError): 

485 fd = None 

486 # Attempt to use mmap if possible 

487 if fd is not None: 

488 if size is None: 

489 size = os.fstat(fd).st_size 

490 if has_mmap: 

491 try: 

492 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) 

493 except (OSError, ValueError): 

494 # Can't mmap - perhaps a socket or invalid file descriptor 

495 pass 

496 else: 

497 return contents, size 

498 contents_bytes = f.read() 

499 size = len(contents_bytes) 

500 return contents_bytes, size 

501 

502 

503def load_pack_index_file( 

504 path: Union[str, os.PathLike], f: Union[IO[bytes], _GitFile] 

505) -> "PackIndex": 

506 """Load an index file from a file-like object. 

507 

508 Args: 

509 path: Path for the index file 

510 f: File-like object 

511 Returns: A PackIndex loaded from the given file 

512 """ 

513 contents, size = _load_file_contents(f) 

514 if contents[:4] == b"\377tOc": 

515 version = struct.unpack(b">L", contents[4:8])[0] 

516 if version == 2: 

517 return PackIndex2(path, file=f, contents=contents, size=size) 

518 elif version == 3: 

519 return PackIndex3(path, file=f, contents=contents, size=size) 

520 else: 

521 raise KeyError(f"Unknown pack index format {version}") 

522 else: 

523 return PackIndex1(path, file=f, contents=contents, size=size) 

524 

525 

526def bisect_find_sha( 

527 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes] 

528) -> Optional[int]: 

529 """Find a SHA in a data blob with sorted SHAs. 

530 

531 Args: 

532 start: Start index of range to search 

533 end: End index of range to search 

534 sha: Sha to find 

535 unpack_name: Callback to retrieve SHA by index 

536 Returns: Index of the SHA, or None if it wasn't found 

537 """ 

538 assert start <= end 

539 while start <= end: 

540 i = (start + end) // 2 

541 file_sha = unpack_name(i) 

542 if file_sha < sha: 

543 start = i + 1 

544 elif file_sha > sha: 

545 end = i - 1 

546 else: 

547 return i 

548 return None 

549 

550 

551PackIndexEntry = tuple[bytes, int, Optional[int]] 

552 

553 

554class PackIndex: 

555 """An index in to a packfile. 

556 

557 Given a sha id of an object a pack index can tell you the location in the 

558 packfile of that object if it has it. 

559 """ 

560 

561 # Default to SHA-1 for backward compatibility 

562 hash_algorithm = 1 

563 hash_size = 20 

564 

565 def __eq__(self, other: object) -> bool: 

566 """Check equality with another PackIndex.""" 

567 if not isinstance(other, PackIndex): 

568 return False 

569 

570 for (name1, _, _), (name2, _, _) in zip( 

571 self.iterentries(), other.iterentries() 

572 ): 

573 if name1 != name2: 

574 return False 

575 return True 

576 

577 def __ne__(self, other: object) -> bool: 

578 """Check if this pack index is not equal to another.""" 

579 return not self.__eq__(other) 

580 

581 def __len__(self) -> int: 

582 """Return the number of entries in this pack index.""" 

583 raise NotImplementedError(self.__len__) 

584 

585 def __iter__(self) -> Iterator[bytes]: 

586 """Iterate over the SHAs in this pack.""" 

587 return map(sha_to_hex, self._itersha()) 

588 

589 def iterentries(self) -> Iterator[PackIndexEntry]: 

590 """Iterate over the entries in this pack index. 

591 

592 Returns: iterator over tuples with object name, offset in packfile and 

593 crc32 checksum. 

594 """ 

595 raise NotImplementedError(self.iterentries) 

596 

597 def get_pack_checksum(self) -> Optional[bytes]: 

598 """Return the SHA1 checksum stored for the corresponding packfile. 

599 

600 Returns: 20-byte binary digest, or None if not available 

601 """ 

602 raise NotImplementedError(self.get_pack_checksum) 

603 

604 @replace_me(since="0.21.0", remove_in="0.23.0") 

605 def object_index(self, sha: bytes) -> int: 

606 """Return the index for the given SHA. 

607 

608 Args: 

609 sha: SHA-1 hash 

610 

611 Returns: 

612 Index position 

613 """ 

614 return self.object_offset(sha) 

615 

616 def object_offset(self, sha: bytes) -> int: 

617 """Return the offset in to the corresponding packfile for the object. 

618 

619 Given the name of an object it will return the offset that object 

620 lives at within the corresponding pack file. If the pack file doesn't 

621 have the object then None will be returned. 

622 """ 

623 raise NotImplementedError(self.object_offset) 

624 

625 def object_sha1(self, index: int) -> bytes: 

626 """Return the SHA1 corresponding to the index in the pack file.""" 

627 for name, offset, _crc32 in self.iterentries(): 

628 if offset == index: 

629 return name 

630 else: 

631 raise KeyError(index) 

632 

633 def _object_offset(self, sha: bytes) -> int: 

634 """See object_offset. 

635 

636 Args: 

637 sha: A *binary* SHA string. (20 characters long)_ 

638 """ 

639 raise NotImplementedError(self._object_offset) 

640 

641 def objects_sha1(self) -> bytes: 

642 """Return the hex SHA1 over all the shas of all objects in this pack. 

643 

644 Note: This is used for the filename of the pack. 

645 """ 

646 return iter_sha1(self._itersha()) 

647 

648 def _itersha(self) -> Iterator[bytes]: 

649 """Yield all the SHA1's of the objects in the index, sorted.""" 

650 raise NotImplementedError(self._itersha) 

651 

652 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

653 """Iterate over all SHA1s with the given prefix. 

654 

655 Args: 

656 prefix: Binary prefix to match 

657 Returns: Iterator of matching SHA1s 

658 """ 

659 # Default implementation for PackIndex classes that don't override 

660 for sha, _, _ in self.iterentries(): 

661 if sha.startswith(prefix): 

662 yield sha 

663 

664 def close(self) -> None: 

665 """Close any open files.""" 

666 

667 def check(self) -> None: 

668 """Check the consistency of this pack index.""" 

669 

670 

671class MemoryPackIndex(PackIndex): 

672 """Pack index that is stored entirely in memory.""" 

673 

674 def __init__( 

675 self, 

676 entries: list[tuple[bytes, int, Optional[int]]], 

677 pack_checksum: Optional[bytes] = None, 

678 ) -> None: 

679 """Create a new MemoryPackIndex. 

680 

681 Args: 

682 entries: Sequence of name, idx, crc32 (sorted) 

683 pack_checksum: Optional pack checksum 

684 """ 

685 self._by_sha = {} 

686 self._by_offset = {} 

687 for name, offset, _crc32 in entries: 

688 self._by_sha[name] = offset 

689 self._by_offset[offset] = name 

690 self._entries = entries 

691 self._pack_checksum = pack_checksum 

692 

693 def get_pack_checksum(self) -> Optional[bytes]: 

694 """Return the SHA checksum stored for the corresponding packfile.""" 

695 return self._pack_checksum 

696 

697 def __len__(self) -> int: 

698 """Return the number of entries in this pack index.""" 

699 return len(self._entries) 

700 

701 def object_offset(self, sha: bytes) -> int: 

702 """Return the offset for the given SHA. 

703 

704 Args: 

705 sha: SHA to look up (binary or hex) 

706 Returns: Offset in the pack file 

707 """ 

708 if len(sha) == 40: 

709 sha = hex_to_sha(sha) 

710 return self._by_sha[sha] 

711 

712 def object_sha1(self, offset: int) -> bytes: 

713 """Return the SHA1 for the object at the given offset.""" 

714 return self._by_offset[offset] 

715 

716 def _itersha(self) -> Iterator[bytes]: 

717 """Iterate over all SHA1s in the index.""" 

718 return iter(self._by_sha) 

719 

720 def iterentries(self) -> Iterator[PackIndexEntry]: 

721 """Iterate over all index entries.""" 

722 return iter(self._entries) 

723 

724 @classmethod 

725 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex": 

726 """Create a MemoryPackIndex from a PackData object.""" 

727 return MemoryPackIndex( 

728 list(pack_data.sorted_entries()), pack_data.get_stored_checksum() 

729 ) 

730 

731 @classmethod 

732 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex": 

733 """Create a copy of another PackIndex in memory.""" 

734 return cls(list(other_index.iterentries()), other_index.get_pack_checksum()) 

735 

736 

737class FilePackIndex(PackIndex): 

738 """Pack index that is based on a file. 

739 

740 To do the loop it opens the file, and indexes first 256 4 byte groups 

741 with the first byte of the sha id. The value in the four byte group indexed 

742 is the end of the group that shares the same starting byte. Subtract one 

743 from the starting byte and index again to find the start of the group. 

744 The values are sorted by sha id within the group, so do the math to find 

745 the start and end offset and then bisect in to find if the value is 

746 present. 

747 """ 

748 

749 _fan_out_table: list[int] 

750 _file: Union[IO[bytes], _GitFile] 

751 

752 def __init__( 

753 self, 

754 filename: Union[str, os.PathLike], 

755 file: Optional[Union[IO[bytes], _GitFile]] = None, 

756 contents: Optional[Union[bytes, "mmap.mmap"]] = None, 

757 size: Optional[int] = None, 

758 ) -> None: 

759 """Create a pack index object. 

760 

761 Provide it with the name of the index file to consider, and it will map 

762 it whenever required. 

763 """ 

764 self._filename = filename 

765 # Take the size now, so it can be checked each time we map the file to 

766 # ensure that it hasn't changed. 

767 if file is None: 

768 self._file = GitFile(filename, "rb") 

769 else: 

770 self._file = file 

771 if contents is None: 

772 self._contents, self._size = _load_file_contents(self._file, size) 

773 else: 

774 self._contents = contents 

775 self._size = size if size is not None else len(contents) 

776 

777 @property 

778 def path(self) -> str: 

779 """Return the path to this index file.""" 

780 return os.fspath(self._filename) 

781 

782 def __eq__(self, other: object) -> bool: 

783 """Check equality with another FilePackIndex.""" 

784 # Quick optimization: 

785 if ( 

786 isinstance(other, FilePackIndex) 

787 and self._fan_out_table != other._fan_out_table 

788 ): 

789 return False 

790 

791 return super().__eq__(other) 

792 

793 def close(self) -> None: 

794 """Close the underlying file and any mmap.""" 

795 self._file.close() 

796 close_fn = getattr(self._contents, "close", None) 

797 if close_fn is not None: 

798 close_fn() 

799 

800 def __len__(self) -> int: 

801 """Return the number of entries in this pack index.""" 

802 return self._fan_out_table[-1] 

803 

804 def _unpack_entry(self, i: int) -> PackIndexEntry: 

805 """Unpack the i-th entry in the index file. 

806 

807 Returns: Tuple with object name (SHA), offset in pack file and CRC32 

808 checksum (if known). 

809 """ 

810 raise NotImplementedError(self._unpack_entry) 

811 

812 def _unpack_name(self, i: int) -> bytes: 

813 """Unpack the i-th name from the index file.""" 

814 raise NotImplementedError(self._unpack_name) 

815 

816 def _unpack_offset(self, i: int) -> int: 

817 """Unpack the i-th object offset from the index file.""" 

818 raise NotImplementedError(self._unpack_offset) 

819 

820 def _unpack_crc32_checksum(self, i: int) -> Optional[int]: 

821 """Unpack the crc32 checksum for the ith object from the index file.""" 

822 raise NotImplementedError(self._unpack_crc32_checksum) 

823 

824 def _itersha(self) -> Iterator[bytes]: 

825 """Iterate over all SHA1s in the index.""" 

826 for i in range(len(self)): 

827 yield self._unpack_name(i) 

828 

829 def iterentries(self) -> Iterator[PackIndexEntry]: 

830 """Iterate over the entries in this pack index. 

831 

832 Returns: iterator over tuples with object name, offset in packfile and 

833 crc32 checksum. 

834 """ 

835 for i in range(len(self)): 

836 yield self._unpack_entry(i) 

837 

838 def _read_fan_out_table(self, start_offset: int) -> list[int]: 

839 """Read the fan-out table from the index. 

840 

841 The fan-out table contains 256 entries mapping first byte values 

842 to the number of objects with SHA1s less than or equal to that byte. 

843 

844 Args: 

845 start_offset: Offset in the file where the fan-out table starts 

846 Returns: List of 256 integers 

847 """ 

848 ret = [] 

849 for i in range(0x100): 

850 fanout_entry = self._contents[ 

851 start_offset + i * 4 : start_offset + (i + 1) * 4 

852 ] 

853 ret.append(struct.unpack(">L", fanout_entry)[0]) 

854 return ret 

855 

856 def check(self) -> None: 

857 """Check that the stored checksum matches the actual checksum.""" 

858 actual = self.calculate_checksum() 

859 stored = self.get_stored_checksum() 

860 if actual != stored: 

861 raise ChecksumMismatch(stored, actual) 

862 

863 def calculate_checksum(self) -> bytes: 

864 """Calculate the SHA1 checksum over this pack index. 

865 

866 Returns: This is a 20-byte binary digest 

867 """ 

868 return sha1(self._contents[:-20]).digest() 

869 

870 def get_pack_checksum(self) -> bytes: 

871 """Return the SHA1 checksum stored for the corresponding packfile. 

872 

873 Returns: 20-byte binary digest 

874 """ 

875 return bytes(self._contents[-40:-20]) 

876 

877 def get_stored_checksum(self) -> bytes: 

878 """Return the SHA1 checksum stored for this index. 

879 

880 Returns: 20-byte binary digest 

881 """ 

882 return bytes(self._contents[-20:]) 

883 

884 def object_offset(self, sha: bytes) -> int: 

885 """Return the offset in to the corresponding packfile for the object. 

886 

887 Given the name of an object it will return the offset that object 

888 lives at within the corresponding pack file. If the pack file doesn't 

889 have the object then None will be returned. 

890 """ 

891 if len(sha) == 40: 

892 sha = hex_to_sha(sha) 

893 try: 

894 return self._object_offset(sha) 

895 except ValueError as exc: 

896 closed = getattr(self._contents, "closed", None) 

897 if closed in (None, True): 

898 raise PackFileDisappeared(self) from exc 

899 raise 

900 

901 def _object_offset(self, sha: bytes) -> int: 

902 """See object_offset. 

903 

904 Args: 

905 sha: A *binary* SHA string. (20 characters long)_ 

906 """ 

907 assert len(sha) == 20 

908 idx = ord(sha[:1]) 

909 if idx == 0: 

910 start = 0 

911 else: 

912 start = self._fan_out_table[idx - 1] 

913 end = self._fan_out_table[idx] 

914 i = bisect_find_sha(start, end, sha, self._unpack_name) 

915 if i is None: 

916 raise KeyError(sha) 

917 return self._unpack_offset(i) 

918 

919 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]: 

920 """Iterate over all SHA1s with the given prefix.""" 

921 start = ord(prefix[:1]) 

922 if start == 0: 

923 start = 0 

924 else: 

925 start = self._fan_out_table[start - 1] 

926 end = ord(prefix[:1]) + 1 

927 if end == 0x100: 

928 end = len(self) 

929 else: 

930 end = self._fan_out_table[end] 

931 assert start <= end 

932 started = False 

933 for i in range(start, end): 

934 name: bytes = self._unpack_name(i) 

935 if name.startswith(prefix): 

936 yield name 

937 started = True 

938 elif started: 

939 break 

940 

941 

942class PackIndex1(FilePackIndex): 

943 """Version 1 Pack Index file.""" 

944 

945 def __init__( 

946 self, 

947 filename: Union[str, os.PathLike], 

948 file: Optional[Union[IO[bytes], _GitFile]] = None, 

949 contents: Optional[bytes] = None, 

950 size: Optional[int] = None, 

951 ) -> None: 

952 """Initialize a version 1 pack index. 

953 

954 Args: 

955 filename: Path to the index file 

956 file: Optional file object 

957 contents: Optional mmap'd contents 

958 size: Optional size of the index 

959 """ 

960 super().__init__(filename, file, contents, size) 

961 self.version = 1 

962 self._fan_out_table = self._read_fan_out_table(0) 

963 

964 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]: 

965 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24)) 

966 return (name, offset, None) 

967 

968 def _unpack_name(self, i: int) -> bytes: 

969 offset = (0x100 * 4) + (i * 24) + 4 

970 return self._contents[offset : offset + 20] 

971 

972 def _unpack_offset(self, i: int) -> int: 

973 offset = (0x100 * 4) + (i * 24) 

974 return unpack_from(">L", self._contents, offset)[0] 

975 

976 def _unpack_crc32_checksum(self, i: int) -> None: 

977 # Not stored in v1 index files 

978 return None 

979 

980 

981class PackIndex2(FilePackIndex): 

982 """Version 2 Pack Index file.""" 

983 

984 def __init__( 

985 self, 

986 filename: Union[str, os.PathLike], 

987 file: Optional[Union[IO[bytes], _GitFile]] = None, 

988 contents: Optional[bytes] = None, 

989 size: Optional[int] = None, 

990 ) -> None: 

991 """Initialize a version 2 pack index. 

992 

993 Args: 

994 filename: Path to the index file 

995 file: Optional file object 

996 contents: Optional mmap'd contents 

997 size: Optional size of the index 

998 """ 

999 super().__init__(filename, file, contents, size) 

1000 if self._contents[:4] != b"\377tOc": 

1001 raise AssertionError("Not a v2 pack index file") 

1002 (self.version,) = unpack_from(b">L", self._contents, 4) 

1003 if self.version != 2: 

1004 raise AssertionError(f"Version was {self.version}") 

1005 self._fan_out_table = self._read_fan_out_table(8) 

1006 self._name_table_offset = 8 + 0x100 * 4 

1007 self._crc32_table_offset = self._name_table_offset + 20 * len(self) 

1008 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1009 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1010 self 

1011 ) 

1012 

1013 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1014 return ( 

1015 self._unpack_name(i), 

1016 self._unpack_offset(i), 

1017 self._unpack_crc32_checksum(i), 

1018 ) 

1019 

1020 def _unpack_name(self, i: int) -> bytes: 

1021 offset = self._name_table_offset + i * 20 

1022 return self._contents[offset : offset + 20] 

1023 

1024 def _unpack_offset(self, i: int) -> int: 

1025 offset = self._pack_offset_table_offset + i * 4 

1026 offset = unpack_from(">L", self._contents, offset)[0] 

1027 if offset & (2**31): 

1028 offset = self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1029 offset = unpack_from(">Q", self._contents, offset)[0] 

1030 return offset 

1031 

1032 def _unpack_crc32_checksum(self, i: int) -> int: 

1033 return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1034 

1035 

1036class PackIndex3(FilePackIndex): 

1037 """Version 3 Pack Index file. 

1038 

1039 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes). 

1040 """ 

1041 

1042 def __init__( 

1043 self, 

1044 filename: Union[str, os.PathLike], 

1045 file: Optional[Union[IO[bytes], _GitFile]] = None, 

1046 contents: Optional[bytes] = None, 

1047 size: Optional[int] = None, 

1048 ) -> None: 

1049 """Initialize a version 3 pack index. 

1050 

1051 Args: 

1052 filename: Path to the index file 

1053 file: Optional file object 

1054 contents: Optional mmap'd contents 

1055 size: Optional size of the index 

1056 """ 

1057 super().__init__(filename, file, contents, size) 

1058 if self._contents[:4] != b"\377tOc": 

1059 raise AssertionError("Not a v3 pack index file") 

1060 (self.version,) = unpack_from(b">L", self._contents, 4) 

1061 if self.version != 3: 

1062 raise AssertionError(f"Version was {self.version}") 

1063 

1064 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1065 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8) 

1066 if self.hash_algorithm == 1: 

1067 self.hash_size = 20 # SHA-1 

1068 elif self.hash_algorithm == 2: 

1069 self.hash_size = 32 # SHA-256 

1070 else: 

1071 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}") 

1072 

1073 # Read length of shortened object names 

1074 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12) 

1075 

1076 # Calculate offsets based on variable hash size 

1077 self._fan_out_table = self._read_fan_out_table( 

1078 16 

1079 ) # After header (4 + 4 + 4 + 4) 

1080 self._name_table_offset = 16 + 0x100 * 4 

1081 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self) 

1082 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self) 

1083 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len( 

1084 self 

1085 ) 

1086 

1087 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]: 

1088 return ( 

1089 self._unpack_name(i), 

1090 self._unpack_offset(i), 

1091 self._unpack_crc32_checksum(i), 

1092 ) 

1093 

1094 def _unpack_name(self, i: int) -> bytes: 

1095 offset = self._name_table_offset + i * self.hash_size 

1096 return self._contents[offset : offset + self.hash_size] 

1097 

1098 def _unpack_offset(self, i: int) -> int: 

1099 offset = self._pack_offset_table_offset + i * 4 

1100 offset = unpack_from(">L", self._contents, offset)[0] 

1101 if offset & (2**31): 

1102 offset = self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8 

1103 offset = unpack_from(">Q", self._contents, offset)[0] 

1104 return offset 

1105 

1106 def _unpack_crc32_checksum(self, i: int) -> int: 

1107 return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0] 

1108 

1109 

1110def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]: 

1111 """Read the header of a pack file. 

1112 

1113 Args: 

1114 read: Read function 

1115 Returns: Tuple of (pack version, number of objects). If no data is 

1116 available to read, returns (None, None). 

1117 """ 

1118 header = read(12) 

1119 if not header: 

1120 raise AssertionError("file too short to contain pack") 

1121 if header[:4] != b"PACK": 

1122 raise AssertionError(f"Invalid pack header {header!r}") 

1123 (version,) = unpack_from(b">L", header, 4) 

1124 if version not in (2, 3): 

1125 raise AssertionError(f"Version was {version}") 

1126 (num_objects,) = unpack_from(b">L", header, 8) 

1127 return (version, num_objects) 

1128 

1129 

1130def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int: 

1131 """Get the total length of a sequence of chunks. 

1132 

1133 Args: 

1134 chunks: Either a single bytes object or an iterable of bytes 

1135 Returns: Total length in bytes 

1136 """ 

1137 if isinstance(chunks, bytes): 

1138 return len(chunks) 

1139 else: 

1140 return sum(map(len, chunks)) 

1141 

1142 

1143def unpack_object( 

1144 read_all: Callable[[int], bytes], 

1145 read_some: Optional[Callable[[int], bytes]] = None, 

1146 compute_crc32: bool = False, 

1147 include_comp: bool = False, 

1148 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1149) -> tuple[UnpackedObject, bytes]: 

1150 """Unpack a Git object. 

1151 

1152 Args: 

1153 read_all: Read function that blocks until the number of requested 

1154 bytes are read. 

1155 read_some: Read function that returns at least one byte, but may not 

1156 return the number of bytes requested. 

1157 compute_crc32: If True, compute the CRC32 of the compressed data. If 

1158 False, the returned CRC32 will be None. 

1159 include_comp: If True, include compressed data in the result. 

1160 zlib_bufsize: An optional buffer size for zlib operations. 

1161 Returns: A tuple of (unpacked, unused), where unused is the unused data 

1162 leftover from decompression, and unpacked in an UnpackedObject with 

1163 the following attrs set: 

1164 

1165 * obj_chunks (for non-delta types) 

1166 * pack_type_num 

1167 * delta_base (for delta types) 

1168 * comp_chunks (if include_comp is True) 

1169 * decomp_chunks 

1170 * decomp_len 

1171 * crc32 (if compute_crc32 is True) 

1172 """ 

1173 if read_some is None: 

1174 read_some = read_all 

1175 if compute_crc32: 

1176 crc32 = 0 

1177 else: 

1178 crc32 = None 

1179 

1180 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1181 type_num = (raw[0] >> 4) & 0x07 

1182 size = raw[0] & 0x0F 

1183 for i, byte in enumerate(raw[1:]): 

1184 size += (byte & 0x7F) << ((i * 7) + 4) 

1185 

1186 delta_base: Union[int, bytes, None] 

1187 raw_base = len(raw) 

1188 if type_num == OFS_DELTA: 

1189 raw, crc32 = take_msb_bytes(read_all, crc32=crc32) 

1190 raw_base += len(raw) 

1191 if raw[-1] & 0x80: 

1192 raise AssertionError 

1193 delta_base_offset = raw[0] & 0x7F 

1194 for byte in raw[1:]: 

1195 delta_base_offset += 1 

1196 delta_base_offset <<= 7 

1197 delta_base_offset += byte & 0x7F 

1198 delta_base = delta_base_offset 

1199 elif type_num == REF_DELTA: 

1200 delta_base_obj = read_all(20) 

1201 if crc32 is not None: 

1202 crc32 = binascii.crc32(delta_base_obj, crc32) 

1203 delta_base = delta_base_obj 

1204 raw_base += 20 

1205 else: 

1206 delta_base = None 

1207 

1208 unpacked = UnpackedObject( 

1209 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32 

1210 ) 

1211 unused = read_zlib_chunks( 

1212 read_some, 

1213 unpacked, 

1214 buffer_size=zlib_bufsize, 

1215 include_comp=include_comp, 

1216 ) 

1217 return unpacked, unused 

1218 

1219 

1220def _compute_object_size(value: tuple[int, Any]) -> int: 

1221 """Compute the size of a unresolved object for use with LRUSizeCache.""" 

1222 (num, obj) = value 

1223 if num in DELTA_TYPES: 

1224 return chunks_length(obj[1]) 

1225 return chunks_length(obj) 

1226 

1227 

1228class PackStreamReader: 

1229 """Class to read a pack stream. 

1230 

1231 The pack is read from a ReceivableProtocol using read() or recv() as 

1232 appropriate. 

1233 """ 

1234 

1235 def __init__( 

1236 self, 

1237 read_all: Callable[[int], bytes], 

1238 read_some: Optional[Callable[[int], bytes]] = None, 

1239 zlib_bufsize: int = _ZLIB_BUFSIZE, 

1240 ) -> None: 

1241 """Initialize pack stream reader. 

1242 

1243 Args: 

1244 read_all: Function to read all requested bytes 

1245 read_some: Function to read some bytes (optional) 

1246 zlib_bufsize: Buffer size for zlib decompression 

1247 """ 

1248 self.read_all = read_all 

1249 if read_some is None: 

1250 self.read_some = read_all 

1251 else: 

1252 self.read_some = read_some 

1253 self.sha = sha1() 

1254 self._offset = 0 

1255 self._rbuf = BytesIO() 

1256 # trailer is a deque to avoid memory allocation on small reads 

1257 self._trailer: deque[int] = deque() 

1258 self._zlib_bufsize = zlib_bufsize 

1259 

1260 def _read(self, read: Callable[[int], bytes], size: int) -> bytes: 

1261 """Read up to size bytes using the given callback. 

1262 

1263 As a side effect, update the verifier's hash (excluding the last 20 

1264 bytes read). 

1265 

1266 Args: 

1267 read: The read callback to read from. 

1268 size: The maximum number of bytes to read; the particular 

1269 behavior is callback-specific. 

1270 Returns: Bytes read 

1271 """ 

1272 data = read(size) 

1273 

1274 # maintain a trailer of the last 20 bytes we've read 

1275 n = len(data) 

1276 self._offset += n 

1277 tn = len(self._trailer) 

1278 if n >= 20: 

1279 to_pop = tn 

1280 to_add = 20 

1281 else: 

1282 to_pop = max(n + tn - 20, 0) 

1283 to_add = n 

1284 self.sha.update( 

1285 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)])) 

1286 ) 

1287 self._trailer.extend(data[-to_add:]) 

1288 

1289 # hash everything but the trailer 

1290 self.sha.update(data[:-to_add]) 

1291 return data 

1292 

1293 def _buf_len(self) -> int: 

1294 buf = self._rbuf 

1295 start = buf.tell() 

1296 buf.seek(0, SEEK_END) 

1297 end = buf.tell() 

1298 buf.seek(start) 

1299 return end - start 

1300 

1301 @property 

1302 def offset(self) -> int: 

1303 """Return current offset in the stream.""" 

1304 return self._offset - self._buf_len() 

1305 

1306 def read(self, size: int) -> bytes: 

1307 """Read, blocking until size bytes are read.""" 

1308 buf_len = self._buf_len() 

1309 if buf_len >= size: 

1310 return self._rbuf.read(size) 

1311 buf_data = self._rbuf.read() 

1312 self._rbuf = BytesIO() 

1313 return buf_data + self._read(self.read_all, size - buf_len) 

1314 

1315 def recv(self, size: int) -> bytes: 

1316 """Read up to size bytes, blocking until one byte is read.""" 

1317 buf_len = self._buf_len() 

1318 if buf_len: 

1319 data = self._rbuf.read(size) 

1320 if size >= buf_len: 

1321 self._rbuf = BytesIO() 

1322 return data 

1323 return self._read(self.read_some, size) 

1324 

1325 def __len__(self) -> int: 

1326 """Return the number of objects in this pack.""" 

1327 return self._num_objects 

1328 

1329 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]: 

1330 """Read the objects in this pack file. 

1331 

1332 Args: 

1333 compute_crc32: If True, compute the CRC32 of the compressed 

1334 data. If False, the returned CRC32 will be None. 

1335 Returns: Iterator over UnpackedObjects with the following members set: 

1336 offset 

1337 obj_type_num 

1338 obj_chunks (for non-delta types) 

1339 delta_base (for delta types) 

1340 decomp_chunks 

1341 decomp_len 

1342 crc32 (if compute_crc32 is True) 

1343 

1344 Raises: 

1345 ChecksumMismatch: if the checksum of the pack contents does not 

1346 match the checksum in the pack trailer. 

1347 zlib.error: if an error occurred during zlib decompression. 

1348 IOError: if an error occurred writing to the output file. 

1349 """ 

1350 pack_version, self._num_objects = read_pack_header(self.read) 

1351 

1352 for _ in range(self._num_objects): 

1353 offset = self.offset 

1354 unpacked, unused = unpack_object( 

1355 self.read, 

1356 read_some=self.recv, 

1357 compute_crc32=compute_crc32, 

1358 zlib_bufsize=self._zlib_bufsize, 

1359 ) 

1360 unpacked.offset = offset 

1361 

1362 # prepend any unused data to current read buffer 

1363 buf = BytesIO() 

1364 buf.write(unused) 

1365 buf.write(self._rbuf.read()) 

1366 buf.seek(0) 

1367 self._rbuf = buf 

1368 

1369 yield unpacked 

1370 

1371 if self._buf_len() < 20: 

1372 # If the read buffer is full, then the last read() got the whole 

1373 # trailer off the wire. If not, it means there is still some of the 

1374 # trailer to read. We need to read() all 20 bytes; N come from the 

1375 # read buffer and (20 - N) come from the wire. 

1376 self.read(20) 

1377 

1378 pack_sha = bytearray(self._trailer) # type: ignore 

1379 if pack_sha != self.sha.digest(): 

1380 raise ChecksumMismatch(sha_to_hex(pack_sha), self.sha.hexdigest()) 

1381 

1382 

1383class PackStreamCopier(PackStreamReader): 

1384 """Class to verify a pack stream as it is being read. 

1385 

1386 The pack is read from a ReceivableProtocol using read() or recv() as 

1387 appropriate and written out to the given file-like object. 

1388 """ 

1389 

1390 def __init__( 

1391 self, 

1392 read_all: Callable, 

1393 read_some: Callable, 

1394 outfile: IO[bytes], 

1395 delta_iter: Optional["DeltaChainIterator"] = None, 

1396 ) -> None: 

1397 """Initialize the copier. 

1398 

1399 Args: 

1400 read_all: Read function that blocks until the number of 

1401 requested bytes are read. 

1402 read_some: Read function that returns at least one byte, but may 

1403 not return the number of bytes requested. 

1404 outfile: File-like object to write output through. 

1405 delta_iter: Optional DeltaChainIterator to record deltas as we 

1406 read them. 

1407 """ 

1408 super().__init__(read_all, read_some=read_some) 

1409 self.outfile = outfile 

1410 self._delta_iter = delta_iter 

1411 

1412 def _read(self, read: Callable, size: int) -> bytes: 

1413 """Read data from the read callback and write it to the file.""" 

1414 data = super()._read(read, size) 

1415 self.outfile.write(data) 

1416 return data 

1417 

1418 def verify(self, progress: Optional[Callable] = None) -> None: 

1419 """Verify a pack stream and write it to the output file. 

1420 

1421 See PackStreamReader.iterobjects for a list of exceptions this may 

1422 throw. 

1423 """ 

1424 i = 0 # default count of entries if read_objects() is empty 

1425 for i, unpacked in enumerate(self.read_objects()): 

1426 if self._delta_iter: 

1427 self._delta_iter.record(unpacked) 

1428 if progress is not None: 

1429 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii")) 

1430 if progress is not None: 

1431 progress(f"copied {i} pack entries\n".encode("ascii")) 

1432 

1433 

1434def obj_sha(type: int, chunks: Union[bytes, Iterable[bytes]]) -> bytes: 

1435 """Compute the SHA for a numeric type and object chunks.""" 

1436 sha = sha1() 

1437 sha.update(object_header(type, chunks_length(chunks))) 

1438 if isinstance(chunks, bytes): 

1439 sha.update(chunks) 

1440 else: 

1441 for chunk in chunks: 

1442 sha.update(chunk) 

1443 return sha.digest() 

1444 

1445 

1446def compute_file_sha( 

1447 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16 

1448) -> "HashObject": 

1449 """Hash a portion of a file into a new SHA. 

1450 

1451 Args: 

1452 f: A file-like object to read from that supports seek(). 

1453 start_ofs: The offset in the file to start reading at. 

1454 end_ofs: The offset in the file to end reading at, relative to the 

1455 end of the file. 

1456 buffer_size: A buffer size for reading. 

1457 Returns: A new SHA object updated with data read from the file. 

1458 """ 

1459 sha = sha1() 

1460 f.seek(0, SEEK_END) 

1461 length = f.tell() 

1462 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length: 

1463 raise AssertionError( 

1464 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}" 

1465 ) 

1466 todo = length + end_ofs - start_ofs 

1467 f.seek(start_ofs) 

1468 while todo: 

1469 data = f.read(min(todo, buffer_size)) 

1470 sha.update(data) 

1471 todo -= len(data) 

1472 return sha 

1473 

1474 

1475class PackData: 

1476 """The data contained in a packfile. 

1477 

1478 Pack files can be accessed both sequentially for exploding a pack, and 

1479 directly with the help of an index to retrieve a specific object. 

1480 

1481 The objects within are either complete or a delta against another. 

1482 

1483 The header is variable length. If the MSB of each byte is set then it 

1484 indicates that the subsequent byte is still part of the header. 

1485 For the first byte the next MS bits are the type, which tells you the type 

1486 of object, and whether it is a delta. The LS byte is the lowest bits of the 

1487 size. For each subsequent byte the LS 7 bits are the next MS bits of the 

1488 size, i.e. the last byte of the header contains the MS bits of the size. 

1489 

1490 For the complete objects the data is stored as zlib deflated data. 

1491 The size in the header is the uncompressed object size, so to uncompress 

1492 you need to just keep feeding data to zlib until you get an object back, 

1493 or it errors on bad data. This is done here by just giving the complete 

1494 buffer from the start of the deflated object on. This is bad, but until I 

1495 get mmap sorted out it will have to do. 

1496 

1497 Currently there are no integrity checks done. Also no attempt is made to 

1498 try and detect the delta case, or a request for an object at the wrong 

1499 position. It will all just throw a zlib or KeyError. 

1500 """ 

1501 

1502 def __init__( 

1503 self, 

1504 filename: Union[str, os.PathLike], 

1505 file: Optional[IO[bytes]] = None, 

1506 size: Optional[int] = None, 

1507 *, 

1508 delta_window_size: Optional[int] = None, 

1509 window_memory: Optional[int] = None, 

1510 delta_cache_size: Optional[int] = None, 

1511 depth: Optional[int] = None, 

1512 threads: Optional[int] = None, 

1513 big_file_threshold: Optional[int] = None, 

1514 ) -> None: 

1515 """Create a PackData object representing the pack in the given filename. 

1516 

1517 The file must exist and stay readable until the object is disposed of. 

1518 It must also stay the same size. It will be mapped whenever needed. 

1519 

1520 Currently there is a restriction on the size of the pack as the python 

1521 mmap implementation is flawed. 

1522 """ 

1523 self._filename = filename 

1524 self._size = size 

1525 self._header_size = 12 

1526 self.delta_window_size = delta_window_size 

1527 self.window_memory = window_memory 

1528 self.delta_cache_size = delta_cache_size 

1529 self.depth = depth 

1530 self.threads = threads 

1531 self.big_file_threshold = big_file_threshold 

1532 self._file: IO[bytes] 

1533 

1534 if file is None: 

1535 self._file = GitFile(self._filename, "rb") 

1536 else: 

1537 self._file = file 

1538 (version, self._num_objects) = read_pack_header(self._file.read) 

1539 

1540 # Use delta_cache_size config if available, otherwise default 

1541 cache_size = delta_cache_size or (1024 * 1024 * 20) 

1542 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]]( 

1543 cache_size, compute_size=_compute_object_size 

1544 ) 

1545 

1546 @property 

1547 def filename(self) -> str: 

1548 """Get the filename of the pack file. 

1549 

1550 Returns: 

1551 Base filename without directory path 

1552 """ 

1553 return os.path.basename(self._filename) 

1554 

1555 @property 

1556 def path(self) -> Union[str, os.PathLike]: 

1557 """Get the full path of the pack file. 

1558 

1559 Returns: 

1560 Full path to the pack file 

1561 """ 

1562 return self._filename 

1563 

1564 @classmethod 

1565 def from_file(cls, file: IO[bytes], size: Optional[int] = None) -> "PackData": 

1566 """Create a PackData object from an open file. 

1567 

1568 Args: 

1569 file: Open file object 

1570 size: Optional file size 

1571 

1572 Returns: 

1573 PackData instance 

1574 """ 

1575 return cls(str(file), file=file, size=size) 

1576 

1577 @classmethod 

1578 def from_path(cls, path: Union[str, os.PathLike]) -> "PackData": 

1579 """Create a PackData object from a file path. 

1580 

1581 Args: 

1582 path: Path to the pack file 

1583 

1584 Returns: 

1585 PackData instance 

1586 """ 

1587 return cls(filename=path) 

1588 

1589 def close(self) -> None: 

1590 """Close the underlying pack file.""" 

1591 self._file.close() 

1592 

1593 def __enter__(self) -> "PackData": 

1594 """Enter context manager.""" 

1595 return self 

1596 

1597 def __exit__( 

1598 self, 

1599 exc_type: Optional[type], 

1600 exc_val: Optional[BaseException], 

1601 exc_tb: Optional[TracebackType], 

1602 ) -> None: 

1603 """Exit context manager.""" 

1604 self.close() 

1605 

1606 def __eq__(self, other: object) -> bool: 

1607 """Check equality with another object.""" 

1608 if isinstance(other, PackData): 

1609 return self.get_stored_checksum() == other.get_stored_checksum() 

1610 return False 

1611 

1612 def _get_size(self) -> int: 

1613 if self._size is not None: 

1614 return self._size 

1615 self._size = os.path.getsize(self._filename) 

1616 if self._size < self._header_size: 

1617 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})" 

1618 raise AssertionError(errmsg) 

1619 return self._size 

1620 

1621 def __len__(self) -> int: 

1622 """Returns the number of objects in this pack.""" 

1623 return self._num_objects 

1624 

1625 def calculate_checksum(self) -> bytes: 

1626 """Calculate the checksum for this pack. 

1627 

1628 Returns: 20-byte binary SHA1 digest 

1629 """ 

1630 return compute_file_sha(cast(IO[bytes], self._file), end_ofs=-20).digest() 

1631 

1632 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]: 

1633 """Iterate over unpacked objects in the pack.""" 

1634 self._file.seek(self._header_size) 

1635 

1636 if self._num_objects is None: 

1637 return 

1638 

1639 for _ in range(self._num_objects): 

1640 offset = self._file.tell() 

1641 unpacked, unused = unpack_object( 

1642 self._file.read, compute_crc32=False, include_comp=include_comp 

1643 ) 

1644 unpacked.offset = offset 

1645 yield unpacked 

1646 # Back up over unused data. 

1647 self._file.seek(-len(unused), SEEK_CUR) 

1648 

1649 def iterentries( 

1650 self, progress=None, resolve_ext_ref: Optional[ResolveExtRefFn] = None 

1651 ): 

1652 """Yield entries summarizing the contents of this pack. 

1653 

1654 Args: 

1655 progress: Progress function, called with current and total 

1656 object count. 

1657 resolve_ext_ref: Optional function to resolve external references 

1658 Returns: iterator of tuples with (sha, offset, crc32) 

1659 """ 

1660 num_objects = self._num_objects 

1661 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref) 

1662 for i, result in enumerate(indexer): 

1663 if progress is not None: 

1664 progress(i, num_objects) 

1665 yield result 

1666 

1667 def sorted_entries( 

1668 self, 

1669 progress: Optional[ProgressFn] = None, 

1670 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

1671 ) -> list[tuple[bytes, int, int]]: 

1672 """Return entries in this pack, sorted by SHA. 

1673 

1674 Args: 

1675 progress: Progress function, called with current and total 

1676 object count 

1677 resolve_ext_ref: Optional function to resolve external references 

1678 Returns: Iterator of tuples with (sha, offset, crc32) 

1679 """ 

1680 return sorted( 

1681 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) 

1682 ) 

1683 

1684 def create_index_v1( 

1685 self, 

1686 filename: str, 

1687 progress: Optional[Callable] = None, 

1688 resolve_ext_ref: Optional[Callable] = None, 

1689 ) -> bytes: 

1690 """Create a version 1 file for this data file. 

1691 

1692 Args: 

1693 filename: Index filename. 

1694 progress: Progress report function 

1695 resolve_ext_ref: Optional function to resolve external references 

1696 Returns: Checksum of index file 

1697 """ 

1698 entries = self.sorted_entries( 

1699 progress=progress, resolve_ext_ref=resolve_ext_ref 

1700 ) 

1701 checksum = self.calculate_checksum() 

1702 with GitFile(filename, "wb") as f: 

1703 write_pack_index_v1( 

1704 cast(BinaryIO, f), 

1705 cast(list[tuple[bytes, int, Optional[int]]], entries), 

1706 checksum, 

1707 ) 

1708 return checksum 

1709 

1710 def create_index_v2( 

1711 self, 

1712 filename: str, 

1713 progress: Optional[Callable] = None, 

1714 resolve_ext_ref: Optional[Callable] = None, 

1715 ) -> bytes: 

1716 """Create a version 2 index file for this data file. 

1717 

1718 Args: 

1719 filename: Index filename. 

1720 progress: Progress report function 

1721 resolve_ext_ref: Optional function to resolve external references 

1722 Returns: Checksum of index file 

1723 """ 

1724 entries = self.sorted_entries( 

1725 progress=progress, resolve_ext_ref=resolve_ext_ref 

1726 ) 

1727 with GitFile(filename, "wb") as f: 

1728 return write_pack_index_v2(f, entries, self.calculate_checksum()) 

1729 

1730 def create_index_v3( 

1731 self, 

1732 filename: str, 

1733 progress: Optional[Callable] = None, 

1734 resolve_ext_ref: Optional[Callable] = None, 

1735 hash_algorithm: int = 1, 

1736 ) -> bytes: 

1737 """Create a version 3 index file for this data file. 

1738 

1739 Args: 

1740 filename: Index filename. 

1741 progress: Progress report function 

1742 resolve_ext_ref: Function to resolve external references 

1743 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

1744 Returns: Checksum of index file 

1745 """ 

1746 entries = self.sorted_entries( 

1747 progress=progress, resolve_ext_ref=resolve_ext_ref 

1748 ) 

1749 with GitFile(filename, "wb") as f: 

1750 return write_pack_index_v3( 

1751 f, entries, self.calculate_checksum(), hash_algorithm 

1752 ) 

1753 

1754 def create_index( 

1755 self, 

1756 filename: str, 

1757 progress: Optional[Callable] = None, 

1758 version: int = 2, 

1759 resolve_ext_ref: Optional[Callable] = None, 

1760 hash_algorithm: int = 1, 

1761 ) -> bytes: 

1762 """Create an index file for this data file. 

1763 

1764 Args: 

1765 filename: Index filename. 

1766 progress: Progress report function 

1767 version: Index version (1, 2, or 3) 

1768 resolve_ext_ref: Function to resolve external references 

1769 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256) 

1770 Returns: Checksum of index file 

1771 """ 

1772 if version == 1: 

1773 return self.create_index_v1( 

1774 filename, progress, resolve_ext_ref=resolve_ext_ref 

1775 ) 

1776 elif version == 2: 

1777 return self.create_index_v2( 

1778 filename, progress, resolve_ext_ref=resolve_ext_ref 

1779 ) 

1780 elif version == 3: 

1781 return self.create_index_v3( 

1782 filename, 

1783 progress, 

1784 resolve_ext_ref=resolve_ext_ref, 

1785 hash_algorithm=hash_algorithm, 

1786 ) 

1787 else: 

1788 raise ValueError(f"unknown index format {version}") 

1789 

1790 def get_stored_checksum(self) -> bytes: 

1791 """Return the expected checksum stored in this pack.""" 

1792 self._file.seek(-20, SEEK_END) 

1793 return self._file.read(20) 

1794 

1795 def check(self) -> None: 

1796 """Check the consistency of this pack.""" 

1797 actual = self.calculate_checksum() 

1798 stored = self.get_stored_checksum() 

1799 if actual != stored: 

1800 raise ChecksumMismatch(stored, actual) 

1801 

1802 def get_unpacked_object_at( 

1803 self, offset: int, *, include_comp: bool = False 

1804 ) -> UnpackedObject: 

1805 """Given offset in the packfile return a UnpackedObject.""" 

1806 assert offset >= self._header_size 

1807 self._file.seek(offset) 

1808 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp) 

1809 unpacked.offset = offset 

1810 return unpacked 

1811 

1812 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]: 

1813 """Given an offset in to the packfile return the object that is there. 

1814 

1815 Using the associated index the location of an object can be looked up, 

1816 and then the packfile can be asked directly for that object using this 

1817 function. 

1818 """ 

1819 try: 

1820 return self._offset_cache[offset] 

1821 except KeyError: 

1822 pass 

1823 unpacked = self.get_unpacked_object_at(offset, include_comp=False) 

1824 return (unpacked.pack_type_num, unpacked._obj()) 

1825 

1826 

1827T = TypeVar("T") 

1828 

1829 

1830class DeltaChainIterator(Generic[T]): 

1831 """Abstract iterator over pack data based on delta chains. 

1832 

1833 Each object in the pack is guaranteed to be inflated exactly once, 

1834 regardless of how many objects reference it as a delta base. As a result, 

1835 memory usage is proportional to the length of the longest delta chain. 

1836 

1837 Subclasses can override _result to define the result type of the iterator. 

1838 By default, results are UnpackedObjects with the following members set: 

1839 

1840 * offset 

1841 * obj_type_num 

1842 * obj_chunks 

1843 * pack_type_num 

1844 * delta_base (for delta types) 

1845 * comp_chunks (if _include_comp is True) 

1846 * decomp_chunks 

1847 * decomp_len 

1848 * crc32 (if _compute_crc32 is True) 

1849 """ 

1850 

1851 _compute_crc32 = False 

1852 _include_comp = False 

1853 

1854 def __init__( 

1855 self, 

1856 file_obj: Optional[BinaryIO], 

1857 *, 

1858 resolve_ext_ref: Optional[Callable] = None, 

1859 ) -> None: 

1860 """Initialize DeltaChainIterator. 

1861 

1862 Args: 

1863 file_obj: File object to read pack data from 

1864 resolve_ext_ref: Optional function to resolve external references 

1865 """ 

1866 self._file = file_obj 

1867 self._resolve_ext_ref = resolve_ext_ref 

1868 self._pending_ofs: dict[int, list[int]] = defaultdict(list) 

1869 self._pending_ref: dict[bytes, list[int]] = defaultdict(list) 

1870 self._full_ofs: list[tuple[int, int]] = [] 

1871 self._ext_refs: list[bytes] = [] 

1872 

1873 @classmethod 

1874 def for_pack_data( 

1875 cls, pack_data: PackData, resolve_ext_ref: Optional[Callable] = None 

1876 ) -> "DeltaChainIterator": 

1877 """Create a DeltaChainIterator from pack data. 

1878 

1879 Args: 

1880 pack_data: PackData object to iterate 

1881 resolve_ext_ref: Optional function to resolve external refs 

1882 

1883 Returns: 

1884 DeltaChainIterator instance 

1885 """ 

1886 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1887 walker.set_pack_data(pack_data) 

1888 for unpacked in pack_data.iter_unpacked(include_comp=False): 

1889 walker.record(unpacked) 

1890 return walker 

1891 

1892 @classmethod 

1893 def for_pack_subset( 

1894 cls, 

1895 pack: "Pack", 

1896 shas: Iterable[bytes], 

1897 *, 

1898 allow_missing: bool = False, 

1899 resolve_ext_ref: Optional[Callable] = None, 

1900 ) -> "DeltaChainIterator": 

1901 """Create a DeltaChainIterator for a subset of objects. 

1902 

1903 Args: 

1904 pack: Pack object containing the data 

1905 shas: Iterable of object SHAs to include 

1906 allow_missing: If True, skip missing objects 

1907 resolve_ext_ref: Optional function to resolve external refs 

1908 

1909 Returns: 

1910 DeltaChainIterator instance 

1911 """ 

1912 walker = cls(None, resolve_ext_ref=resolve_ext_ref) 

1913 walker.set_pack_data(pack.data) 

1914 todo = set() 

1915 for sha in shas: 

1916 assert isinstance(sha, bytes) 

1917 try: 

1918 off = pack.index.object_offset(sha) 

1919 except KeyError: 

1920 if not allow_missing: 

1921 raise 

1922 else: 

1923 todo.add(off) 

1924 done = set() 

1925 while todo: 

1926 off = todo.pop() 

1927 unpacked = pack.data.get_unpacked_object_at(off) 

1928 walker.record(unpacked) 

1929 done.add(off) 

1930 base_ofs = None 

1931 if unpacked.pack_type_num == OFS_DELTA: 

1932 assert unpacked.offset is not None 

1933 assert unpacked.delta_base is not None 

1934 assert isinstance(unpacked.delta_base, int) 

1935 base_ofs = unpacked.offset - unpacked.delta_base 

1936 elif unpacked.pack_type_num == REF_DELTA: 

1937 with suppress(KeyError): 

1938 assert isinstance(unpacked.delta_base, bytes) 

1939 base_ofs = pack.index.object_index(unpacked.delta_base) 

1940 if base_ofs is not None and base_ofs not in done: 

1941 todo.add(base_ofs) 

1942 return walker 

1943 

1944 def record(self, unpacked: UnpackedObject) -> None: 

1945 """Record an unpacked object for later processing. 

1946 

1947 Args: 

1948 unpacked: UnpackedObject to record 

1949 """ 

1950 type_num = unpacked.pack_type_num 

1951 offset = unpacked.offset 

1952 assert offset is not None 

1953 if type_num == OFS_DELTA: 

1954 assert unpacked.delta_base is not None 

1955 assert isinstance(unpacked.delta_base, int) 

1956 base_offset = offset - unpacked.delta_base 

1957 self._pending_ofs[base_offset].append(offset) 

1958 elif type_num == REF_DELTA: 

1959 assert isinstance(unpacked.delta_base, bytes) 

1960 self._pending_ref[unpacked.delta_base].append(offset) 

1961 else: 

1962 self._full_ofs.append((offset, type_num)) 

1963 

1964 def set_pack_data(self, pack_data: PackData) -> None: 

1965 """Set the pack data for iteration. 

1966 

1967 Args: 

1968 pack_data: PackData object to use 

1969 """ 

1970 self._file = cast(BinaryIO, pack_data._file) 

1971 

1972 def _walk_all_chains(self) -> Iterator[T]: 

1973 for offset, type_num in self._full_ofs: 

1974 yield from self._follow_chain(offset, type_num, None) 

1975 yield from self._walk_ref_chains() 

1976 assert not self._pending_ofs, repr(self._pending_ofs) 

1977 

1978 def _ensure_no_pending(self) -> None: 

1979 if self._pending_ref: 

1980 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref]) 

1981 

1982 def _walk_ref_chains(self) -> Iterator[T]: 

1983 if not self._resolve_ext_ref: 

1984 self._ensure_no_pending() 

1985 return 

1986 

1987 for base_sha, pending in sorted(self._pending_ref.items()): 

1988 if base_sha not in self._pending_ref: 

1989 continue 

1990 try: 

1991 type_num, chunks = self._resolve_ext_ref(base_sha) 

1992 except KeyError: 

1993 # Not an external ref, but may depend on one. Either it will 

1994 # get popped via a _follow_chain call, or we will raise an 

1995 # error below. 

1996 continue 

1997 self._ext_refs.append(base_sha) 

1998 self._pending_ref.pop(base_sha) 

1999 for new_offset in pending: 

2000 yield from self._follow_chain(new_offset, type_num, chunks) 

2001 

2002 self._ensure_no_pending() 

2003 

2004 def _result(self, unpacked: UnpackedObject) -> T: 

2005 raise NotImplementedError 

2006 

2007 def _resolve_object( 

2008 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2009 ) -> UnpackedObject: 

2010 assert self._file is not None 

2011 self._file.seek(offset) 

2012 unpacked, _ = unpack_object( 

2013 self._file.read, 

2014 include_comp=self._include_comp, 

2015 compute_crc32=self._compute_crc32, 

2016 ) 

2017 unpacked.offset = offset 

2018 if base_chunks is None: 

2019 assert unpacked.pack_type_num == obj_type_num 

2020 else: 

2021 assert unpacked.pack_type_num in DELTA_TYPES 

2022 unpacked.obj_type_num = obj_type_num 

2023 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks) 

2024 return unpacked 

2025 

2026 def _follow_chain( 

2027 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]] 

2028 ) -> Iterator[T]: 

2029 # Unlike PackData.get_object_at, there is no need to cache offsets as 

2030 # this approach by design inflates each object exactly once. 

2031 todo = [(offset, obj_type_num, base_chunks)] 

2032 while todo: 

2033 (offset, obj_type_num, base_chunks) = todo.pop() 

2034 unpacked = self._resolve_object(offset, obj_type_num, base_chunks) 

2035 yield self._result(unpacked) 

2036 

2037 assert unpacked.offset is not None 

2038 unblocked = chain( 

2039 self._pending_ofs.pop(unpacked.offset, []), 

2040 self._pending_ref.pop(unpacked.sha(), []), 

2041 ) 

2042 todo.extend( 

2043 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore 

2044 for new_offset in unblocked 

2045 ) 

2046 

2047 def __iter__(self) -> Iterator[T]: 

2048 """Iterate over objects in the pack.""" 

2049 return self._walk_all_chains() 

2050 

2051 @property 

2052 def ext_refs(self) -> list[bytes]: 

2053 """Return external references.""" 

2054 return self._ext_refs 

2055 

2056 

2057class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]): 

2058 """Delta chain iterator that yield unpacked objects.""" 

2059 

2060 def _result(self, unpacked: UnpackedObject) -> UnpackedObject: 

2061 """Return the unpacked object. 

2062 

2063 Args: 

2064 unpacked: The unpacked object 

2065 

2066 Returns: 

2067 The unpacked object unchanged 

2068 """ 

2069 return unpacked 

2070 

2071 

2072class PackIndexer(DeltaChainIterator[PackIndexEntry]): 

2073 """Delta chain iterator that yields index entries.""" 

2074 

2075 _compute_crc32 = True 

2076 

2077 def _result(self, unpacked: UnpackedObject) -> tuple: 

2078 """Convert unpacked object to pack index entry. 

2079 

2080 Args: 

2081 unpacked: The unpacked object 

2082 

2083 Returns: 

2084 Tuple of (sha, offset, crc32) for index entry 

2085 """ 

2086 return unpacked.sha(), unpacked.offset, unpacked.crc32 

2087 

2088 

2089class PackInflater(DeltaChainIterator[ShaFile]): 

2090 """Delta chain iterator that yields ShaFile objects.""" 

2091 

2092 def _result(self, unpacked: UnpackedObject) -> ShaFile: 

2093 """Convert unpacked object to ShaFile. 

2094 

2095 Args: 

2096 unpacked: The unpacked object 

2097 

2098 Returns: 

2099 ShaFile object from the unpacked data 

2100 """ 

2101 return unpacked.sha_file() 

2102 

2103 

2104class SHA1Reader(BinaryIO): 

2105 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2106 

2107 def __init__(self, f: IO[bytes]) -> None: 

2108 """Initialize SHA1Reader. 

2109 

2110 Args: 

2111 f: File-like object to wrap 

2112 """ 

2113 self.f = f 

2114 self.sha1 = sha1(b"") 

2115 

2116 def read(self, size: int = -1) -> bytes: 

2117 """Read bytes and update SHA1. 

2118 

2119 Args: 

2120 size: Number of bytes to read, -1 for all 

2121 

2122 Returns: 

2123 Bytes read from file 

2124 """ 

2125 data = self.f.read(size) 

2126 self.sha1.update(data) 

2127 return data 

2128 

2129 def check_sha(self, allow_empty: bool = False) -> None: 

2130 """Check if the SHA1 matches the expected value. 

2131 

2132 Args: 

2133 allow_empty: Allow empty SHA1 hash 

2134 

2135 Raises: 

2136 ChecksumMismatch: If SHA1 doesn't match 

2137 """ 

2138 stored = self.f.read(20) 

2139 # If git option index.skipHash is set the index will be empty 

2140 if stored != self.sha1.digest() and ( 

2141 not allow_empty 

2142 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000" 

2143 ): 

2144 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored)) 

2145 

2146 def close(self) -> None: 

2147 """Close the underlying file.""" 

2148 return self.f.close() 

2149 

2150 def tell(self) -> int: 

2151 """Return current file position.""" 

2152 return self.f.tell() 

2153 

2154 # BinaryIO abstract methods 

2155 def readable(self) -> bool: 

2156 """Check if file is readable.""" 

2157 return True 

2158 

2159 def writable(self) -> bool: 

2160 """Check if file is writable.""" 

2161 return False 

2162 

2163 def seekable(self) -> bool: 

2164 """Check if file is seekable.""" 

2165 return getattr(self.f, "seekable", lambda: False)() 

2166 

2167 def seek(self, offset: int, whence: int = 0) -> int: 

2168 """Seek to position in file. 

2169 

2170 Args: 

2171 offset: Position offset 

2172 whence: Reference point (0=start, 1=current, 2=end) 

2173 

2174 Returns: 

2175 New file position 

2176 """ 

2177 return self.f.seek(offset, whence) 

2178 

2179 def flush(self) -> None: 

2180 """Flush the file buffer.""" 

2181 if hasattr(self.f, "flush"): 

2182 self.f.flush() 

2183 

2184 def readline(self, size: int = -1) -> bytes: 

2185 """Read a line from the file. 

2186 

2187 Args: 

2188 size: Maximum bytes to read 

2189 

2190 Returns: 

2191 Line read from file 

2192 """ 

2193 return self.f.readline(size) 

2194 

2195 def readlines(self, hint: int = -1) -> list[bytes]: 

2196 """Read all lines from the file. 

2197 

2198 Args: 

2199 hint: Approximate number of bytes to read 

2200 

2201 Returns: 

2202 List of lines 

2203 """ 

2204 return self.f.readlines(hint) 

2205 

2206 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2207 """Write multiple lines to the file (not supported).""" 

2208 raise UnsupportedOperation("writelines") 

2209 

2210 def write(self, data: bytes, /) -> int: # type: ignore[override] 

2211 """Write data to the file (not supported).""" 

2212 raise UnsupportedOperation("write") 

2213 

2214 def __enter__(self) -> "SHA1Reader": 

2215 """Enter context manager.""" 

2216 return self 

2217 

2218 def __exit__( 

2219 self, 

2220 type: Optional[type], 

2221 value: Optional[BaseException], 

2222 traceback: Optional[TracebackType], 

2223 ) -> None: 

2224 """Exit context manager and close file.""" 

2225 self.close() 

2226 

2227 def __iter__(self) -> "SHA1Reader": 

2228 """Return iterator for reading file lines.""" 

2229 return self 

2230 

2231 def __next__(self) -> bytes: 

2232 """Get next line from file. 

2233 

2234 Returns: 

2235 Next line 

2236 

2237 Raises: 

2238 StopIteration: When no more lines 

2239 """ 

2240 line = self.readline() 

2241 if not line: 

2242 raise StopIteration 

2243 return line 

2244 

2245 def fileno(self) -> int: 

2246 """Return file descriptor number.""" 

2247 return self.f.fileno() 

2248 

2249 def isatty(self) -> bool: 

2250 """Check if file is a terminal.""" 

2251 return getattr(self.f, "isatty", lambda: False)() 

2252 

2253 def truncate(self, size: Optional[int] = None) -> int: 

2254 """Not supported for read-only file. 

2255 

2256 Raises: 

2257 UnsupportedOperation: Always raised 

2258 """ 

2259 raise UnsupportedOperation("truncate") 

2260 

2261 

2262class SHA1Writer(BinaryIO): 

2263 """Wrapper for file-like object that remembers the SHA1 of its data.""" 

2264 

2265 def __init__(self, f) -> None: 

2266 """Initialize SHA1Writer. 

2267 

2268 Args: 

2269 f: File-like object to wrap 

2270 """ 

2271 self.f = f 

2272 self.length = 0 

2273 self.sha1 = sha1(b"") 

2274 self.digest: Optional[bytes] = None 

2275 

2276 def write(self, data) -> int: 

2277 """Write data and update SHA1. 

2278 

2279 Args: 

2280 data: Data to write 

2281 

2282 Returns: 

2283 Number of bytes written 

2284 """ 

2285 self.sha1.update(data) 

2286 self.f.write(data) 

2287 self.length += len(data) 

2288 return len(data) 

2289 

2290 def write_sha(self) -> bytes: 

2291 """Write the SHA1 digest to the file. 

2292 

2293 Returns: 

2294 The SHA1 digest bytes 

2295 """ 

2296 sha = self.sha1.digest() 

2297 assert len(sha) == 20 

2298 self.f.write(sha) 

2299 self.length += len(sha) 

2300 return sha 

2301 

2302 def close(self) -> None: 

2303 """Close the pack file and finalize the SHA.""" 

2304 self.digest = self.write_sha() 

2305 self.f.close() 

2306 

2307 def offset(self) -> int: 

2308 """Get the total number of bytes written. 

2309 

2310 Returns: 

2311 Total bytes written 

2312 """ 

2313 return self.length 

2314 

2315 def tell(self) -> int: 

2316 """Return current file position.""" 

2317 return self.f.tell() 

2318 

2319 # BinaryIO abstract methods 

2320 def readable(self) -> bool: 

2321 """Check if file is readable.""" 

2322 return False 

2323 

2324 def writable(self) -> bool: 

2325 """Check if file is writable.""" 

2326 return True 

2327 

2328 def seekable(self) -> bool: 

2329 """Check if file is seekable.""" 

2330 return getattr(self.f, "seekable", lambda: False)() 

2331 

2332 def seek(self, offset: int, whence: int = 0) -> int: 

2333 """Seek to position in file. 

2334 

2335 Args: 

2336 offset: Position offset 

2337 whence: Reference point (0=start, 1=current, 2=end) 

2338 

2339 Returns: 

2340 New file position 

2341 """ 

2342 return self.f.seek(offset, whence) 

2343 

2344 def flush(self) -> None: 

2345 """Flush the file buffer.""" 

2346 if hasattr(self.f, "flush"): 

2347 self.f.flush() 

2348 

2349 def readline(self, size: int = -1) -> bytes: 

2350 """Not supported for write-only file. 

2351 

2352 Raises: 

2353 UnsupportedOperation: Always raised 

2354 """ 

2355 raise UnsupportedOperation("readline") 

2356 

2357 def readlines(self, hint: int = -1) -> list[bytes]: 

2358 """Not supported for write-only file. 

2359 

2360 Raises: 

2361 UnsupportedOperation: Always raised 

2362 """ 

2363 raise UnsupportedOperation("readlines") 

2364 

2365 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override] 

2366 """Write multiple lines to the file. 

2367 

2368 Args: 

2369 lines: Iterable of lines to write 

2370 """ 

2371 for line in lines: 

2372 self.write(line) 

2373 

2374 def read(self, size: int = -1) -> bytes: 

2375 """Not supported for write-only file. 

2376 

2377 Raises: 

2378 UnsupportedOperation: Always raised 

2379 """ 

2380 raise UnsupportedOperation("read") 

2381 

2382 def __enter__(self) -> "SHA1Writer": 

2383 """Enter context manager.""" 

2384 return self 

2385 

2386 def __exit__( 

2387 self, 

2388 type: Optional[type], 

2389 value: Optional[BaseException], 

2390 traceback: Optional[TracebackType], 

2391 ) -> None: 

2392 """Exit context manager and close file.""" 

2393 self.close() 

2394 

2395 def __iter__(self) -> "SHA1Writer": 

2396 """Return iterator.""" 

2397 return self 

2398 

2399 def __next__(self) -> bytes: 

2400 """Not supported for write-only file. 

2401 

2402 Raises: 

2403 UnsupportedOperation: Always raised 

2404 """ 

2405 raise UnsupportedOperation("__next__") 

2406 

2407 def fileno(self) -> int: 

2408 """Return file descriptor number.""" 

2409 return self.f.fileno() 

2410 

2411 def isatty(self) -> bool: 

2412 """Check if file is a terminal.""" 

2413 return getattr(self.f, "isatty", lambda: False)() 

2414 

2415 def truncate(self, size: Optional[int] = None) -> int: 

2416 """Not supported for write-only file. 

2417 

2418 Raises: 

2419 UnsupportedOperation: Always raised 

2420 """ 

2421 raise UnsupportedOperation("truncate") 

2422 

2423 

2424def pack_object_header( 

2425 type_num: int, delta_base: Optional[Union[bytes, int]], size: int 

2426) -> bytearray: 

2427 """Create a pack object header for the given object info. 

2428 

2429 Args: 

2430 type_num: Numeric type of the object. 

2431 delta_base: Delta base offset or ref, or None for whole objects. 

2432 size: Uncompressed object size. 

2433 Returns: A header for a packed object. 

2434 """ 

2435 header = [] 

2436 c = (type_num << 4) | (size & 15) 

2437 size >>= 4 

2438 while size: 

2439 header.append(c | 0x80) 

2440 c = size & 0x7F 

2441 size >>= 7 

2442 header.append(c) 

2443 if type_num == OFS_DELTA: 

2444 assert isinstance(delta_base, int) 

2445 ret = [delta_base & 0x7F] 

2446 delta_base >>= 7 

2447 while delta_base: 

2448 delta_base -= 1 

2449 ret.insert(0, 0x80 | (delta_base & 0x7F)) 

2450 delta_base >>= 7 

2451 header.extend(ret) 

2452 elif type_num == REF_DELTA: 

2453 assert isinstance(delta_base, bytes) 

2454 assert len(delta_base) == 20 

2455 header += delta_base 

2456 return bytearray(header) 

2457 

2458 

2459def pack_object_chunks( 

2460 type: int, 

2461 object: Union[ 

2462 ShaFile, bytes, list[bytes], tuple[Union[bytes, int], Union[bytes, list[bytes]]] 

2463 ], 

2464 compression_level: int = -1, 

2465) -> Iterator[bytes]: 

2466 """Generate chunks for a pack object. 

2467 

2468 Args: 

2469 type: Numeric type of the object 

2470 object: Object to write 

2471 compression_level: the zlib compression level 

2472 Returns: Chunks 

2473 """ 

2474 if type in DELTA_TYPES: 

2475 if isinstance(object, tuple): 

2476 delta_base, object = object 

2477 else: 

2478 raise TypeError("Delta types require a tuple of (delta_base, object)") 

2479 else: 

2480 delta_base = None 

2481 

2482 # Convert object to list of bytes chunks 

2483 if isinstance(object, bytes): 

2484 chunks = [object] 

2485 elif isinstance(object, list): 

2486 chunks = object 

2487 elif isinstance(object, ShaFile): 

2488 chunks = object.as_raw_chunks() 

2489 else: 

2490 # Shouldn't reach here with proper typing 

2491 raise TypeError(f"Unexpected object type: {object.__class__.__name__}") 

2492 

2493 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks)))) 

2494 compressor = zlib.compressobj(level=compression_level) 

2495 for data in chunks: 

2496 yield compressor.compress(data) 

2497 yield compressor.flush() 

2498 

2499 

2500def write_pack_object( 

2501 write: Callable[[bytes], int], 

2502 type: int, 

2503 object: ShaFile, 

2504 sha: Optional["HashObject"] = None, 

2505 compression_level: int = -1, 

2506) -> int: 

2507 """Write pack object to a file. 

2508 

2509 Args: 

2510 write: Write function to use 

2511 type: Numeric type of the object 

2512 object: Object to write 

2513 sha: Optional SHA-1 hasher to update 

2514 compression_level: the zlib compression level 

2515 Returns: CRC32 checksum of the written object 

2516 """ 

2517 crc32 = 0 

2518 for chunk in pack_object_chunks(type, object, compression_level=compression_level): 

2519 write(chunk) 

2520 if sha is not None: 

2521 sha.update(chunk) 

2522 crc32 = binascii.crc32(chunk, crc32) 

2523 return crc32 & 0xFFFFFFFF 

2524 

2525 

2526def write_pack( 

2527 filename, 

2528 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2529 *, 

2530 deltify: Optional[bool] = None, 

2531 delta_window_size: Optional[int] = None, 

2532 compression_level: int = -1, 

2533): 

2534 """Write a new pack data file. 

2535 

2536 Args: 

2537 filename: Path to the new pack file (without .pack extension) 

2538 objects: Objects to write to the pack 

2539 delta_window_size: Delta window size 

2540 deltify: Whether to deltify pack objects 

2541 compression_level: the zlib compression level 

2542 Returns: Tuple with checksum of pack file and index file 

2543 """ 

2544 with GitFile(filename + ".pack", "wb") as f: 

2545 entries, data_sum = write_pack_objects( 

2546 f.write, 

2547 objects, 

2548 delta_window_size=delta_window_size, 

2549 deltify=deltify, 

2550 compression_level=compression_level, 

2551 ) 

2552 entries = sorted([(k, v[0], v[1]) for (k, v) in entries.items()]) 

2553 with GitFile(filename + ".idx", "wb") as f: 

2554 return data_sum, write_pack_index(f, entries, data_sum) 

2555 

2556 

2557def pack_header_chunks(num_objects: int) -> Iterator[bytes]: 

2558 """Yield chunks for a pack header.""" 

2559 yield b"PACK" # Pack header 

2560 yield struct.pack(b">L", 2) # Pack version 

2561 yield struct.pack(b">L", num_objects) # Number of objects in pack 

2562 

2563 

2564def write_pack_header(write, num_objects) -> None: 

2565 """Write a pack header for the given number of objects.""" 

2566 if hasattr(write, "write"): 

2567 write = write.write 

2568 warnings.warn( 

2569 "write_pack_header() now takes a write rather than file argument", 

2570 DeprecationWarning, 

2571 stacklevel=2, 

2572 ) 

2573 for chunk in pack_header_chunks(num_objects): 

2574 write(chunk) 

2575 

2576 

2577def find_reusable_deltas( 

2578 container: PackedObjectContainer, 

2579 object_ids: set[bytes], 

2580 *, 

2581 other_haves: Optional[set[bytes]] = None, 

2582 progress=None, 

2583) -> Iterator[UnpackedObject]: 

2584 """Find deltas in a pack that can be reused. 

2585 

2586 Args: 

2587 container: Pack container to search for deltas 

2588 object_ids: Set of object IDs to find deltas for 

2589 other_haves: Set of other object IDs we have 

2590 progress: Optional progress reporting callback 

2591 

2592 Returns: 

2593 Iterator of UnpackedObject entries that can be reused 

2594 """ 

2595 if other_haves is None: 

2596 other_haves = set() 

2597 reused = 0 

2598 for i, unpacked in enumerate( 

2599 container.iter_unpacked_subset( 

2600 object_ids, allow_missing=True, convert_ofs_delta=True 

2601 ) 

2602 ): 

2603 if progress is not None and i % 1000 == 0: 

2604 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode()) 

2605 if unpacked.pack_type_num == REF_DELTA: 

2606 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore 

2607 if hexsha in object_ids or hexsha in other_haves: 

2608 yield unpacked 

2609 reused += 1 

2610 if progress is not None: 

2611 progress((f"found {reused} deltas to reuse\n").encode()) 

2612 

2613 

2614def deltify_pack_objects( 

2615 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[bytes]]]], 

2616 *, 

2617 window_size: Optional[int] = None, 

2618 progress=None, 

2619) -> Iterator[UnpackedObject]: 

2620 """Generate deltas for pack objects. 

2621 

2622 Args: 

2623 objects: An iterable of (object, path) tuples to deltify. 

2624 window_size: Window size; None for default 

2625 progress: Optional progress reporting callback 

2626 Returns: Iterator over type_num, object id, delta_base, content 

2627 delta_base is None for full text entries 

2628 """ 

2629 

2630 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, Optional[bytes]]]]: 

2631 for e in objects: 

2632 if isinstance(e, ShaFile): 

2633 yield (e, (e.type_num, None)) 

2634 else: 

2635 yield (e[0], (e[0].type_num, e[1])) 

2636 

2637 yield from deltas_from_sorted_objects( 

2638 sort_objects_for_delta(objects_with_hints()), 

2639 window_size=window_size, 

2640 progress=progress, 

2641 ) 

2642 

2643 

2644def sort_objects_for_delta( 

2645 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[PackHint]]]], 

2646) -> Iterator[ShaFile]: 

2647 """Sort objects for optimal delta compression. 

2648 

2649 Args: 

2650 objects: Iterator of objects or (object, hint) tuples 

2651 

2652 Returns: 

2653 Iterator of sorted ShaFile objects 

2654 """ 

2655 magic = [] 

2656 for entry in objects: 

2657 if isinstance(entry, tuple): 

2658 obj, hint = entry 

2659 if hint is None: 

2660 type_num = None 

2661 path = None 

2662 else: 

2663 (type_num, path) = hint 

2664 else: 

2665 obj = entry 

2666 magic.append((type_num, path, -obj.raw_length(), obj)) 

2667 # Build a list of objects ordered by the magic Linus heuristic 

2668 # This helps us find good objects to diff against us 

2669 magic.sort() 

2670 return (x[3] for x in magic) 

2671 

2672 

2673def deltas_from_sorted_objects( 

2674 objects, window_size: Optional[int] = None, progress=None 

2675): 

2676 """Create deltas from sorted objects. 

2677 

2678 Args: 

2679 objects: Iterator of sorted objects to deltify 

2680 window_size: Delta window size; None for default 

2681 progress: Optional progress reporting callback 

2682 

2683 Returns: 

2684 Iterator of UnpackedObject entries 

2685 """ 

2686 # TODO(jelmer): Use threads 

2687 if window_size is None: 

2688 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE 

2689 

2690 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque() 

2691 for i, o in enumerate(objects): 

2692 if progress is not None and i % 1000 == 0: 

2693 progress((f"generating deltas: {i}\r").encode()) 

2694 raw = o.as_raw_chunks() 

2695 winner = raw 

2696 winner_len = sum(map(len, winner)) 

2697 winner_base = None 

2698 for base_id, base_type_num, base in possible_bases: 

2699 if base_type_num != o.type_num: 

2700 continue 

2701 delta_len = 0 

2702 delta = [] 

2703 for chunk in create_delta(b"".join(base), b"".join(raw)): 

2704 delta_len += len(chunk) 

2705 if delta_len >= winner_len: 

2706 break 

2707 delta.append(chunk) 

2708 else: 

2709 winner_base = base_id 

2710 winner = delta 

2711 winner_len = sum(map(len, winner)) 

2712 yield UnpackedObject( 

2713 o.type_num, 

2714 sha=o.sha().digest(), 

2715 delta_base=winner_base, 

2716 decomp_len=winner_len, 

2717 decomp_chunks=winner, 

2718 ) 

2719 possible_bases.appendleft((o.sha().digest(), o.type_num, raw)) 

2720 while len(possible_bases) > window_size: 

2721 possible_bases.pop() 

2722 

2723 

2724def pack_objects_to_data( 

2725 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2726 *, 

2727 deltify: Optional[bool] = None, 

2728 delta_window_size: Optional[int] = None, 

2729 ofs_delta: bool = True, 

2730 progress=None, 

2731) -> tuple[int, Iterator[UnpackedObject]]: 

2732 """Create pack data from objects. 

2733 

2734 Args: 

2735 objects: Pack objects 

2736 deltify: Whether to deltify pack objects 

2737 delta_window_size: Delta window size 

2738 ofs_delta: Whether to use offset deltas 

2739 progress: Optional progress reporting callback 

2740 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2741 """ 

2742 # TODO(jelmer): support deltaifying 

2743 count = len(objects) 

2744 if deltify is None: 

2745 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2746 # slow at the moment. 

2747 deltify = False 

2748 if deltify: 

2749 return ( 

2750 count, 

2751 deltify_pack_objects( 

2752 iter(objects), # type: ignore 

2753 window_size=delta_window_size, 

2754 progress=progress, 

2755 ), 

2756 ) 

2757 else: 

2758 

2759 def iter_without_path() -> Iterator[UnpackedObject]: 

2760 for o in objects: 

2761 if isinstance(o, tuple): 

2762 yield full_unpacked_object(o[0]) 

2763 else: 

2764 yield full_unpacked_object(o) 

2765 

2766 return (count, iter_without_path()) 

2767 

2768 

2769def generate_unpacked_objects( 

2770 container: PackedObjectContainer, 

2771 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2772 delta_window_size: Optional[int] = None, 

2773 deltify: Optional[bool] = None, 

2774 reuse_deltas: bool = True, 

2775 ofs_delta: bool = True, 

2776 other_haves: Optional[set[bytes]] = None, 

2777 progress=None, 

2778) -> Iterator[UnpackedObject]: 

2779 """Create pack data from objects. 

2780 

2781 Returns: Tuples with (type_num, hexdigest, delta base, object chunks) 

2782 """ 

2783 todo = dict(object_ids) 

2784 if reuse_deltas: 

2785 for unpack in find_reusable_deltas( 

2786 container, set(todo), other_haves=other_haves, progress=progress 

2787 ): 

2788 del todo[sha_to_hex(unpack.sha())] 

2789 yield unpack 

2790 if deltify is None: 

2791 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too 

2792 # slow at the moment. 

2793 deltify = False 

2794 if deltify: 

2795 objects_to_delta = container.iterobjects_subset( 

2796 todo.keys(), allow_missing=False 

2797 ) 

2798 yield from deltas_from_sorted_objects( 

2799 sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta), 

2800 window_size=delta_window_size, 

2801 progress=progress, 

2802 ) 

2803 else: 

2804 for oid in todo: 

2805 yield full_unpacked_object(container[oid]) 

2806 

2807 

2808def full_unpacked_object(o: ShaFile) -> UnpackedObject: 

2809 """Create an UnpackedObject from a ShaFile. 

2810 

2811 Args: 

2812 o: ShaFile object to convert 

2813 

2814 Returns: 

2815 UnpackedObject with full object data 

2816 """ 

2817 return UnpackedObject( 

2818 o.type_num, 

2819 delta_base=None, 

2820 crc32=None, 

2821 decomp_chunks=o.as_raw_chunks(), 

2822 sha=o.sha().digest(), 

2823 ) 

2824 

2825 

2826def write_pack_from_container( 

2827 write, 

2828 container: PackedObjectContainer, 

2829 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]], 

2830 delta_window_size: Optional[int] = None, 

2831 deltify: Optional[bool] = None, 

2832 reuse_deltas: bool = True, 

2833 compression_level: int = -1, 

2834 other_haves: Optional[set[bytes]] = None, 

2835): 

2836 """Write a new pack data file. 

2837 

2838 Args: 

2839 write: write function to use 

2840 container: PackedObjectContainer 

2841 object_ids: Sequence of (object_id, hint) tuples to write 

2842 delta_window_size: Sliding window size for searching for deltas; 

2843 Set to None for default window size. 

2844 deltify: Whether to deltify objects 

2845 reuse_deltas: Whether to reuse existing deltas 

2846 compression_level: the zlib compression level to use 

2847 other_haves: Set of additional object IDs the receiver has 

2848 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2849 """ 

2850 pack_contents_count = len(object_ids) 

2851 pack_contents = generate_unpacked_objects( 

2852 container, 

2853 object_ids, 

2854 delta_window_size=delta_window_size, 

2855 deltify=deltify, 

2856 reuse_deltas=reuse_deltas, 

2857 other_haves=other_haves, 

2858 ) 

2859 

2860 return write_pack_data( 

2861 write, 

2862 pack_contents, 

2863 num_records=pack_contents_count, 

2864 compression_level=compression_level, 

2865 ) 

2866 

2867 

2868def write_pack_objects( 

2869 write, 

2870 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]], 

2871 *, 

2872 delta_window_size: Optional[int] = None, 

2873 deltify: Optional[bool] = None, 

2874 compression_level: int = -1, 

2875): 

2876 """Write a new pack data file. 

2877 

2878 Args: 

2879 write: write function to use 

2880 objects: Sequence of (object, path) tuples to write 

2881 delta_window_size: Sliding window size for searching for deltas; 

2882 Set to None for default window size. 

2883 deltify: Whether to deltify objects 

2884 compression_level: the zlib compression level to use 

2885 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2886 """ 

2887 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify) 

2888 

2889 return write_pack_data( 

2890 write, 

2891 pack_contents, 

2892 num_records=pack_contents_count, 

2893 compression_level=compression_level, 

2894 ) 

2895 

2896 

2897class PackChunkGenerator: 

2898 """Generator for pack data chunks.""" 

2899 

2900 def __init__( 

2901 self, 

2902 num_records=None, 

2903 records=None, 

2904 progress=None, 

2905 compression_level=-1, 

2906 reuse_compressed=True, 

2907 ) -> None: 

2908 """Initialize PackChunkGenerator. 

2909 

2910 Args: 

2911 num_records: Expected number of records 

2912 records: Iterator of pack records 

2913 progress: Optional progress callback 

2914 compression_level: Compression level (-1 for default) 

2915 reuse_compressed: Whether to reuse compressed chunks 

2916 """ 

2917 self.cs = sha1(b"") 

2918 self.entries: dict[Union[int, bytes], tuple[int, int]] = {} 

2919 self._it = self._pack_data_chunks( 

2920 num_records=num_records, 

2921 records=records, 

2922 progress=progress, 

2923 compression_level=compression_level, 

2924 reuse_compressed=reuse_compressed, 

2925 ) 

2926 

2927 def sha1digest(self) -> bytes: 

2928 """Return the SHA1 digest of the pack data.""" 

2929 return self.cs.digest() 

2930 

2931 def __iter__(self) -> Iterator[bytes]: 

2932 """Iterate over pack data chunks.""" 

2933 return self._it 

2934 

2935 def _pack_data_chunks( 

2936 self, 

2937 records: Iterator[UnpackedObject], 

2938 *, 

2939 num_records=None, 

2940 progress=None, 

2941 compression_level: int = -1, 

2942 reuse_compressed: bool = True, 

2943 ) -> Iterator[bytes]: 

2944 """Iterate pack data file chunks. 

2945 

2946 Args: 

2947 records: Iterator over UnpackedObject 

2948 num_records: Number of records (defaults to len(records) if not specified) 

2949 progress: Function to report progress to 

2950 compression_level: the zlib compression level 

2951 reuse_compressed: Whether to reuse compressed chunks 

2952 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

2953 """ 

2954 # Write the pack 

2955 if num_records is None: 

2956 num_records = len(records) # type: ignore 

2957 offset = 0 

2958 for chunk in pack_header_chunks(num_records): 

2959 yield chunk 

2960 self.cs.update(chunk) 

2961 offset += len(chunk) 

2962 actual_num_records = 0 

2963 for i, unpacked in enumerate(records): 

2964 type_num = unpacked.pack_type_num 

2965 if progress is not None and i % 1000 == 0: 

2966 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii")) 

2967 raw: Union[list[bytes], tuple[int, list[bytes]], tuple[bytes, list[bytes]]] 

2968 if unpacked.delta_base is not None: 

2969 try: 

2970 base_offset, base_crc32 = self.entries[unpacked.delta_base] 

2971 except KeyError: 

2972 type_num = REF_DELTA 

2973 assert isinstance(unpacked.delta_base, bytes) 

2974 raw = (unpacked.delta_base, unpacked.decomp_chunks) 

2975 else: 

2976 type_num = OFS_DELTA 

2977 raw = (offset - base_offset, unpacked.decomp_chunks) 

2978 else: 

2979 raw = unpacked.decomp_chunks 

2980 chunks: Union[list[bytes], Iterator[bytes]] 

2981 if unpacked.comp_chunks is not None and reuse_compressed: 

2982 chunks = unpacked.comp_chunks 

2983 else: 

2984 chunks = pack_object_chunks( 

2985 type_num, raw, compression_level=compression_level 

2986 ) 

2987 crc32 = 0 

2988 object_size = 0 

2989 for chunk in chunks: 

2990 yield chunk 

2991 crc32 = binascii.crc32(chunk, crc32) 

2992 self.cs.update(chunk) 

2993 object_size += len(chunk) 

2994 actual_num_records += 1 

2995 self.entries[unpacked.sha()] = (offset, crc32) 

2996 offset += object_size 

2997 if actual_num_records != num_records: 

2998 raise AssertionError( 

2999 f"actual records written differs: {actual_num_records} != {num_records}" 

3000 ) 

3001 

3002 yield self.cs.digest() 

3003 

3004 

3005def write_pack_data( 

3006 write, 

3007 records: Iterator[UnpackedObject], 

3008 *, 

3009 num_records=None, 

3010 progress=None, 

3011 compression_level=-1, 

3012): 

3013 """Write a new pack data file. 

3014 

3015 Args: 

3016 write: Write function to use 

3017 num_records: Number of records (defaults to len(records) if None) 

3018 records: Iterator over type_num, object_id, delta_base, raw 

3019 progress: Function to report progress to 

3020 compression_level: the zlib compression level 

3021 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum 

3022 """ 

3023 chunk_generator = PackChunkGenerator( 

3024 num_records=num_records, 

3025 records=records, 

3026 progress=progress, 

3027 compression_level=compression_level, 

3028 ) 

3029 for chunk in chunk_generator: 

3030 write(chunk) 

3031 return chunk_generator.entries, chunk_generator.sha1digest() 

3032 

3033 

3034def write_pack_index_v1( 

3035 f: BinaryIO, entries: list[tuple[bytes, int, Optional[int]]], pack_checksum: bytes 

3036) -> bytes: 

3037 """Write a new pack index file. 

3038 

3039 Args: 

3040 f: A file-like object to write to 

3041 entries: List of tuples with object name (sha), offset_in_pack, 

3042 and crc32_checksum. 

3043 pack_checksum: Checksum of the pack file. 

3044 Returns: The SHA of the written index file 

3045 """ 

3046 f = SHA1Writer(f) 

3047 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3048 for name, _offset, _entry_checksum in entries: 

3049 fan_out_table[ord(name[:1])] += 1 

3050 # Fan-out table 

3051 for i in range(0x100): 

3052 f.write(struct.pack(">L", fan_out_table[i])) 

3053 fan_out_table[i + 1] += fan_out_table[i] 

3054 for name, offset, _entry_checksum in entries: 

3055 if not (offset <= 0xFFFFFFFF): 

3056 raise TypeError("pack format 1 only supports offsets < 2Gb") 

3057 f.write(struct.pack(">L20s", offset, name)) 

3058 assert len(pack_checksum) == 20 

3059 f.write(pack_checksum) 

3060 return f.write_sha() 

3061 

3062 

3063def _delta_encode_size(size) -> bytes: 

3064 ret = bytearray() 

3065 c = size & 0x7F 

3066 size >>= 7 

3067 while size: 

3068 ret.append(c | 0x80) 

3069 c = size & 0x7F 

3070 size >>= 7 

3071 ret.append(c) 

3072 return bytes(ret) 

3073 

3074 

3075# The length of delta compression copy operations in version 2 packs is limited 

3076# to 64K. To copy more, we use several copy operations. Version 3 packs allow 

3077# 24-bit lengths in copy operations, but we always make version 2 packs. 

3078_MAX_COPY_LEN = 0xFFFF 

3079 

3080 

3081def _encode_copy_operation(start: int, length: int) -> bytes: 

3082 scratch = bytearray([0x80]) 

3083 for i in range(4): 

3084 if start & 0xFF << i * 8: 

3085 scratch.append((start >> i * 8) & 0xFF) 

3086 scratch[0] |= 1 << i 

3087 for i in range(2): 

3088 if length & 0xFF << i * 8: 

3089 scratch.append((length >> i * 8) & 0xFF) 

3090 scratch[0] |= 1 << (4 + i) 

3091 return bytes(scratch) 

3092 

3093 

3094def create_delta(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]: 

3095 """Use python difflib to work out how to transform base_buf to target_buf. 

3096 

3097 Args: 

3098 base_buf: Base buffer 

3099 target_buf: Target buffer 

3100 """ 

3101 if isinstance(base_buf, list): 

3102 base_buf = b"".join(base_buf) 

3103 if isinstance(target_buf, list): 

3104 target_buf = b"".join(target_buf) 

3105 assert isinstance(base_buf, bytes) 

3106 assert isinstance(target_buf, bytes) 

3107 # write delta header 

3108 yield _delta_encode_size(len(base_buf)) 

3109 yield _delta_encode_size(len(target_buf)) 

3110 # write out delta opcodes 

3111 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf) 

3112 for opcode, i1, i2, j1, j2 in seq.get_opcodes(): 

3113 # Git patch opcodes don't care about deletes! 

3114 # if opcode == 'replace' or opcode == 'delete': 

3115 # pass 

3116 if opcode == "equal": 

3117 # If they are equal, unpacker will use data from base_buf 

3118 # Write out an opcode that says what range to use 

3119 copy_start = i1 

3120 copy_len = i2 - i1 

3121 while copy_len > 0: 

3122 to_copy = min(copy_len, _MAX_COPY_LEN) 

3123 yield _encode_copy_operation(copy_start, to_copy) 

3124 copy_start += to_copy 

3125 copy_len -= to_copy 

3126 if opcode == "replace" or opcode == "insert": 

3127 # If we are replacing a range or adding one, then we just 

3128 # output it to the stream (prefixed by its size) 

3129 s = j2 - j1 

3130 o = j1 

3131 while s > 127: 

3132 yield bytes([127]) 

3133 yield memoryview(target_buf)[o : o + 127] 

3134 s -= 127 

3135 o += 127 

3136 yield bytes([s]) 

3137 yield memoryview(target_buf)[o : o + s] 

3138 

3139 

3140def apply_delta( 

3141 src_buf: Union[bytes, list[bytes]], delta: Union[bytes, list[bytes]] 

3142) -> list[bytes]: 

3143 """Based on the similar function in git's patch-delta.c. 

3144 

3145 Args: 

3146 src_buf: Source buffer 

3147 delta: Delta instructions 

3148 """ 

3149 if not isinstance(src_buf, bytes): 

3150 src_buf = b"".join(src_buf) 

3151 if not isinstance(delta, bytes): 

3152 delta = b"".join(delta) 

3153 out = [] 

3154 index = 0 

3155 delta_length = len(delta) 

3156 

3157 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]: 

3158 size = 0 

3159 i = 0 

3160 while delta: 

3161 cmd = ord(delta[index : index + 1]) 

3162 index += 1 

3163 size |= (cmd & ~0x80) << i 

3164 i += 7 

3165 if not cmd & 0x80: 

3166 break 

3167 return size, index 

3168 

3169 src_size, index = get_delta_header_size(delta, index) 

3170 dest_size, index = get_delta_header_size(delta, index) 

3171 if src_size != len(src_buf): 

3172 raise ApplyDeltaError( 

3173 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}" 

3174 ) 

3175 while index < delta_length: 

3176 cmd = ord(delta[index : index + 1]) 

3177 index += 1 

3178 if cmd & 0x80: 

3179 cp_off = 0 

3180 for i in range(4): 

3181 if cmd & (1 << i): 

3182 x = ord(delta[index : index + 1]) 

3183 index += 1 

3184 cp_off |= x << (i * 8) 

3185 cp_size = 0 

3186 # Version 3 packs can contain copy sizes larger than 64K. 

3187 for i in range(3): 

3188 if cmd & (1 << (4 + i)): 

3189 x = ord(delta[index : index + 1]) 

3190 index += 1 

3191 cp_size |= x << (i * 8) 

3192 if cp_size == 0: 

3193 cp_size = 0x10000 

3194 if ( 

3195 cp_off + cp_size < cp_size 

3196 or cp_off + cp_size > src_size 

3197 or cp_size > dest_size 

3198 ): 

3199 break 

3200 out.append(src_buf[cp_off : cp_off + cp_size]) 

3201 elif cmd != 0: 

3202 out.append(delta[index : index + cmd]) 

3203 index += cmd 

3204 else: 

3205 raise ApplyDeltaError("Invalid opcode 0") 

3206 

3207 if index != delta_length: 

3208 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}") 

3209 

3210 if dest_size != chunks_length(out): 

3211 raise ApplyDeltaError("dest size incorrect") 

3212 

3213 return out 

3214 

3215 

3216def write_pack_index_v2( 

3217 f, entries: Iterable[PackIndexEntry], pack_checksum: bytes 

3218) -> bytes: 

3219 """Write a new pack index file. 

3220 

3221 Args: 

3222 f: File-like object to write to 

3223 entries: List of tuples with object name (sha), offset_in_pack, and 

3224 crc32_checksum. 

3225 pack_checksum: Checksum of the pack file. 

3226 Returns: The SHA of the index file written 

3227 """ 

3228 f = SHA1Writer(f) 

3229 f.write(b"\377tOc") # Magic! 

3230 f.write(struct.pack(">L", 2)) 

3231 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3232 for name, offset, entry_checksum in entries: 

3233 fan_out_table[ord(name[:1])] += 1 

3234 # Fan-out table 

3235 largetable: list[int] = [] 

3236 for i in range(0x100): 

3237 f.write(struct.pack(b">L", fan_out_table[i])) 

3238 fan_out_table[i + 1] += fan_out_table[i] 

3239 for name, offset, entry_checksum in entries: 

3240 f.write(name) 

3241 for name, offset, entry_checksum in entries: 

3242 f.write(struct.pack(b">L", entry_checksum)) 

3243 for name, offset, entry_checksum in entries: 

3244 if offset < 2**31: 

3245 f.write(struct.pack(b">L", offset)) 

3246 else: 

3247 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3248 largetable.append(offset) 

3249 for offset in largetable: 

3250 f.write(struct.pack(b">Q", offset)) 

3251 assert len(pack_checksum) == 20 

3252 f.write(pack_checksum) 

3253 return f.write_sha() 

3254 

3255 

3256def write_pack_index_v3( 

3257 f, entries: Iterable[PackIndexEntry], pack_checksum: bytes, hash_algorithm: int = 1 

3258) -> bytes: 

3259 """Write a new pack index file in v3 format. 

3260 

3261 Args: 

3262 f: File-like object to write to 

3263 entries: List of tuples with object name (sha), offset_in_pack, and 

3264 crc32_checksum. 

3265 pack_checksum: Checksum of the pack file. 

3266 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256) 

3267 Returns: The SHA of the index file written 

3268 """ 

3269 if hash_algorithm == 1: 

3270 hash_size = 20 # SHA-1 

3271 writer_cls = SHA1Writer 

3272 elif hash_algorithm == 2: 

3273 hash_size = 32 # SHA-256 

3274 # TODO: Add SHA256Writer when SHA-256 support is implemented 

3275 raise NotImplementedError("SHA-256 support not yet implemented") 

3276 else: 

3277 raise ValueError(f"Unknown hash algorithm {hash_algorithm}") 

3278 

3279 # Convert entries to list to allow multiple iterations 

3280 entries_list = list(entries) 

3281 

3282 # Calculate shortest unambiguous prefix length for object names 

3283 # For now, use full hash size (this could be optimized) 

3284 shortened_oid_len = hash_size 

3285 

3286 f = writer_cls(f) 

3287 f.write(b"\377tOc") # Magic! 

3288 f.write(struct.pack(">L", 3)) # Version 3 

3289 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm 

3290 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length 

3291 

3292 fan_out_table: dict[int, int] = defaultdict(lambda: 0) 

3293 for name, offset, entry_checksum in entries_list: 

3294 if len(name) != hash_size: 

3295 raise ValueError( 

3296 f"Object name has wrong length: expected {hash_size}, got {len(name)}" 

3297 ) 

3298 fan_out_table[ord(name[:1])] += 1 

3299 

3300 # Fan-out table 

3301 largetable: list[int] = [] 

3302 for i in range(0x100): 

3303 f.write(struct.pack(b">L", fan_out_table[i])) 

3304 fan_out_table[i + 1] += fan_out_table[i] 

3305 

3306 # Object names table 

3307 for name, offset, entry_checksum in entries_list: 

3308 f.write(name) 

3309 

3310 # CRC32 checksums table 

3311 for name, offset, entry_checksum in entries_list: 

3312 f.write(struct.pack(b">L", entry_checksum)) 

3313 

3314 # Offset table 

3315 for name, offset, entry_checksum in entries_list: 

3316 if offset < 2**31: 

3317 f.write(struct.pack(b">L", offset)) 

3318 else: 

3319 f.write(struct.pack(b">L", 2**31 + len(largetable))) 

3320 largetable.append(offset) 

3321 

3322 # Large offset table 

3323 for offset in largetable: 

3324 f.write(struct.pack(b">Q", offset)) 

3325 

3326 assert len(pack_checksum) == hash_size, ( 

3327 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}" 

3328 ) 

3329 f.write(pack_checksum) 

3330 return f.write_sha() 

3331 

3332 

3333def write_pack_index( 

3334 index_filename, entries, pack_checksum, progress=None, version=None 

3335): 

3336 """Write a pack index file. 

3337 

3338 Args: 

3339 index_filename: Index filename. 

3340 entries: List of (checksum, offset, crc32) tuples 

3341 pack_checksum: Checksum of the pack file. 

3342 progress: Progress function (not currently used) 

3343 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION. 

3344 

3345 Returns: 

3346 SHA of the written index file 

3347 """ 

3348 if version is None: 

3349 version = DEFAULT_PACK_INDEX_VERSION 

3350 

3351 if version == 1: 

3352 return write_pack_index_v1(index_filename, entries, pack_checksum) 

3353 elif version == 2: 

3354 return write_pack_index_v2(index_filename, entries, pack_checksum) 

3355 elif version == 3: 

3356 return write_pack_index_v3(index_filename, entries, pack_checksum) 

3357 else: 

3358 raise ValueError(f"Unsupported pack index version: {version}") 

3359 

3360 

3361class Pack: 

3362 """A Git pack object.""" 

3363 

3364 _data_load: Optional[Callable[[], PackData]] 

3365 _idx_load: Optional[Callable[[], PackIndex]] 

3366 

3367 _data: Optional[PackData] 

3368 _idx: Optional[PackIndex] 

3369 

3370 def __init__( 

3371 self, 

3372 basename, 

3373 resolve_ext_ref: Optional[ResolveExtRefFn] = None, 

3374 *, 

3375 delta_window_size=None, 

3376 window_memory=None, 

3377 delta_cache_size=None, 

3378 depth=None, 

3379 threads=None, 

3380 big_file_threshold=None, 

3381 ) -> None: 

3382 """Initialize a Pack object. 

3383 

3384 Args: 

3385 basename: Base path for pack files (without .pack/.idx extension) 

3386 resolve_ext_ref: Optional function to resolve external references 

3387 delta_window_size: Size of the delta compression window 

3388 window_memory: Memory limit for delta compression window 

3389 delta_cache_size: Size of the delta cache 

3390 depth: Maximum depth for delta chains 

3391 threads: Number of threads to use for operations 

3392 big_file_threshold: Size threshold for big file handling 

3393 """ 

3394 self._basename = basename 

3395 self._data = None 

3396 self._idx = None 

3397 self._idx_path = self._basename + ".idx" 

3398 self._data_path = self._basename + ".pack" 

3399 self.delta_window_size = delta_window_size 

3400 self.window_memory = window_memory 

3401 self.delta_cache_size = delta_cache_size 

3402 self.depth = depth 

3403 self.threads = threads 

3404 self.big_file_threshold = big_file_threshold 

3405 self._data_load = lambda: PackData( 

3406 self._data_path, 

3407 delta_window_size=delta_window_size, 

3408 window_memory=window_memory, 

3409 delta_cache_size=delta_cache_size, 

3410 depth=depth, 

3411 threads=threads, 

3412 big_file_threshold=big_file_threshold, 

3413 ) 

3414 self._idx_load = lambda: load_pack_index(self._idx_path) 

3415 self.resolve_ext_ref = resolve_ext_ref 

3416 

3417 @classmethod 

3418 def from_lazy_objects(cls, data_fn: Callable, idx_fn: Callable) -> "Pack": 

3419 """Create a new pack object from callables to load pack data and index objects.""" 

3420 ret = cls("") 

3421 ret._data_load = data_fn 

3422 ret._idx_load = idx_fn 

3423 return ret 

3424 

3425 @classmethod 

3426 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack": 

3427 """Create a new pack object from pack data and index objects.""" 

3428 ret = cls("") 

3429 ret._data = data 

3430 ret._data_load = None 

3431 ret._idx = idx 

3432 ret._idx_load = None 

3433 ret.check_length_and_checksum() 

3434 return ret 

3435 

3436 def name(self) -> bytes: 

3437 """The SHA over the SHAs of the objects in this pack.""" 

3438 return self.index.objects_sha1() 

3439 

3440 @property 

3441 def data(self) -> PackData: 

3442 """The pack data object being used.""" 

3443 if self._data is None: 

3444 assert self._data_load 

3445 self._data = self._data_load() 

3446 self.check_length_and_checksum() 

3447 return self._data 

3448 

3449 @property 

3450 def index(self) -> PackIndex: 

3451 """The index being used. 

3452 

3453 Note: This may be an in-memory index 

3454 """ 

3455 if self._idx is None: 

3456 assert self._idx_load 

3457 self._idx = self._idx_load() 

3458 return self._idx 

3459 

3460 def close(self) -> None: 

3461 """Close the pack file and index.""" 

3462 if self._data is not None: 

3463 self._data.close() 

3464 if self._idx is not None: 

3465 self._idx.close() 

3466 

3467 def __enter__(self) -> "Pack": 

3468 """Enter context manager.""" 

3469 return self 

3470 

3471 def __exit__( 

3472 self, 

3473 exc_type: Optional[type], 

3474 exc_val: Optional[BaseException], 

3475 exc_tb: Optional[TracebackType], 

3476 ) -> None: 

3477 """Exit context manager.""" 

3478 self.close() 

3479 

3480 def __eq__(self, other: object) -> bool: 

3481 """Check equality with another pack.""" 

3482 if not isinstance(other, Pack): 

3483 return False 

3484 return self.index == other.index 

3485 

3486 def __len__(self) -> int: 

3487 """Number of entries in this pack.""" 

3488 return len(self.index) 

3489 

3490 def __repr__(self) -> str: 

3491 """Return string representation of this pack.""" 

3492 return f"{self.__class__.__name__}({self._basename!r})" 

3493 

3494 def __iter__(self) -> Iterator[bytes]: 

3495 """Iterate over all the sha1s of the objects in this pack.""" 

3496 return iter(self.index) 

3497 

3498 def check_length_and_checksum(self) -> None: 

3499 """Sanity check the length and checksum of the pack index and data.""" 

3500 assert len(self.index) == len(self.data), ( 

3501 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)" 

3502 ) 

3503 idx_stored_checksum = self.index.get_pack_checksum() 

3504 data_stored_checksum = self.data.get_stored_checksum() 

3505 if ( 

3506 idx_stored_checksum is not None 

3507 and idx_stored_checksum != data_stored_checksum 

3508 ): 

3509 raise ChecksumMismatch( 

3510 sha_to_hex(idx_stored_checksum), 

3511 sha_to_hex(data_stored_checksum), 

3512 ) 

3513 

3514 def check(self) -> None: 

3515 """Check the integrity of this pack. 

3516 

3517 Raises: 

3518 ChecksumMismatch: if a checksum for the index or data is wrong 

3519 """ 

3520 self.index.check() 

3521 self.data.check() 

3522 for obj in self.iterobjects(): 

3523 obj.check() 

3524 # TODO: object connectivity checks 

3525 

3526 def get_stored_checksum(self) -> bytes: 

3527 """Return the stored checksum of the pack data.""" 

3528 return self.data.get_stored_checksum() 

3529 

3530 def pack_tuples(self) -> list[tuple[ShaFile, None]]: 

3531 """Return pack tuples for all objects in pack.""" 

3532 return [(o, None) for o in self.iterobjects()] 

3533 

3534 def __contains__(self, sha1: bytes) -> bool: 

3535 """Check whether this pack contains a particular SHA1.""" 

3536 try: 

3537 self.index.object_offset(sha1) 

3538 return True 

3539 except KeyError: 

3540 return False 

3541 

3542 def get_raw(self, sha1: bytes) -> tuple[int, bytes]: 

3543 """Get raw object data by SHA1.""" 

3544 offset = self.index.object_offset(sha1) 

3545 obj_type, obj = self.data.get_object_at(offset) 

3546 type_num, chunks = self.resolve_object(offset, obj_type, obj) 

3547 return type_num, b"".join(chunks) 

3548 

3549 def __getitem__(self, sha1: bytes) -> ShaFile: 

3550 """Retrieve the specified SHA1.""" 

3551 type, uncomp = self.get_raw(sha1) 

3552 return ShaFile.from_raw_string(type, uncomp, sha=sha1) 

3553 

3554 def iterobjects(self) -> Iterator[ShaFile]: 

3555 """Iterate over the objects in this pack.""" 

3556 return iter( 

3557 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref) 

3558 ) 

3559 

3560 def iterobjects_subset( 

3561 self, shas: Iterable[ObjectID], *, allow_missing: bool = False 

3562 ) -> Iterator[ShaFile]: 

3563 """Iterate over a subset of objects in this pack.""" 

3564 return ( 

3565 uo 

3566 for uo in PackInflater.for_pack_subset( 

3567 self, 

3568 shas, 

3569 allow_missing=allow_missing, 

3570 resolve_ext_ref=self.resolve_ext_ref, 

3571 ) 

3572 if uo.id in shas 

3573 ) 

3574 

3575 def iter_unpacked_subset( 

3576 self, 

3577 shas: Iterable[ObjectID], 

3578 *, 

3579 include_comp: bool = False, 

3580 allow_missing: bool = False, 

3581 convert_ofs_delta: bool = False, 

3582 ) -> Iterator[UnpackedObject]: 

3583 """Iterate over unpacked objects in subset.""" 

3584 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list) 

3585 ofs: dict[int, bytes] = {} 

3586 todo = set(shas) 

3587 for unpacked in self.iter_unpacked(include_comp=include_comp): 

3588 sha = unpacked.sha() 

3589 if unpacked.offset is not None: 

3590 ofs[unpacked.offset] = sha 

3591 hexsha = sha_to_hex(sha) 

3592 if hexsha in todo: 

3593 if unpacked.pack_type_num == OFS_DELTA: 

3594 assert isinstance(unpacked.delta_base, int) 

3595 assert unpacked.offset is not None 

3596 base_offset = unpacked.offset - unpacked.delta_base 

3597 try: 

3598 unpacked.delta_base = ofs[base_offset] 

3599 except KeyError: 

3600 ofs_pending[base_offset].append(unpacked) 

3601 continue 

3602 else: 

3603 unpacked.pack_type_num = REF_DELTA 

3604 yield unpacked 

3605 todo.remove(hexsha) 

3606 if unpacked.offset is not None: 

3607 for child in ofs_pending.pop(unpacked.offset, []): 

3608 child.pack_type_num = REF_DELTA 

3609 child.delta_base = sha 

3610 yield child 

3611 assert not ofs_pending 

3612 if not allow_missing and todo: 

3613 raise UnresolvedDeltas(list(todo)) 

3614 

3615 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]: 

3616 """Iterate over all unpacked objects in this pack.""" 

3617 ofs_to_entries = { 

3618 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries() 

3619 } 

3620 for unpacked in self.data.iter_unpacked(include_comp=include_comp): 

3621 assert unpacked.offset is not None 

3622 (sha, crc32) = ofs_to_entries[unpacked.offset] 

3623 unpacked._sha = sha 

3624 unpacked.crc32 = crc32 

3625 yield unpacked 

3626 

3627 def keep(self, msg: Optional[bytes] = None) -> str: 

3628 """Add a .keep file for the pack, preventing git from garbage collecting it. 

3629 

3630 Args: 

3631 msg: A message written inside the .keep file; can be used later 

3632 to determine whether or not a .keep file is obsolete. 

3633 Returns: The path of the .keep file, as a string. 

3634 """ 

3635 keepfile_name = f"{self._basename}.keep" 

3636 with GitFile(keepfile_name, "wb") as keepfile: 

3637 if msg: 

3638 keepfile.write(msg) 

3639 keepfile.write(b"\n") 

3640 return keepfile_name 

3641 

3642 def get_ref(self, sha: bytes) -> tuple[Optional[int], int, OldUnpackedObject]: 

3643 """Get the object for a ref SHA, only looking in this pack.""" 

3644 # TODO: cache these results 

3645 try: 

3646 offset = self.index.object_offset(sha) 

3647 except KeyError: 

3648 offset = None 

3649 if offset: 

3650 type, obj = self.data.get_object_at(offset) 

3651 elif self.resolve_ext_ref: 

3652 type, obj = self.resolve_ext_ref(sha) 

3653 else: 

3654 raise KeyError(sha) 

3655 return offset, type, obj 

3656 

3657 def resolve_object( 

3658 self, offset: int, type: int, obj, get_ref=None 

3659 ) -> tuple[int, Iterable[bytes]]: 

3660 """Resolve an object, possibly resolving deltas when necessary. 

3661 

3662 Returns: Tuple with object type and contents. 

3663 """ 

3664 # Walk down the delta chain, building a stack of deltas to reach 

3665 # the requested object. 

3666 base_offset = offset 

3667 base_type = type 

3668 base_obj = obj 

3669 delta_stack = [] 

3670 while base_type in DELTA_TYPES: 

3671 prev_offset = base_offset 

3672 if get_ref is None: 

3673 get_ref = self.get_ref 

3674 if base_type == OFS_DELTA: 

3675 (delta_offset, delta) = base_obj 

3676 # TODO: clean up asserts and replace with nicer error messages 

3677 base_offset = base_offset - delta_offset 

3678 base_type, base_obj = self.data.get_object_at(base_offset) 

3679 assert isinstance(base_type, int) 

3680 elif base_type == REF_DELTA: 

3681 (basename, delta) = base_obj 

3682 assert isinstance(basename, bytes) and len(basename) == 20 

3683 base_offset, base_type, base_obj = get_ref(basename) 

3684 assert isinstance(base_type, int) 

3685 if base_offset == prev_offset: # object is based on itself 

3686 raise UnresolvedDeltas([basename]) 

3687 delta_stack.append((prev_offset, base_type, delta)) 

3688 

3689 # Now grab the base object (mustn't be a delta) and apply the 

3690 # deltas all the way up the stack. 

3691 chunks = base_obj 

3692 for prev_offset, _delta_type, delta in reversed(delta_stack): 

3693 # Convert chunks to bytes for apply_delta if needed 

3694 if isinstance(chunks, list): 

3695 chunks_bytes = b"".join(chunks) 

3696 elif isinstance(chunks, tuple): 

3697 # For tuple type, second element is the actual data 

3698 _, chunk_data = chunks 

3699 if isinstance(chunk_data, list): 

3700 chunks_bytes = b"".join(chunk_data) 

3701 else: 

3702 chunks_bytes = chunk_data 

3703 else: 

3704 chunks_bytes = chunks 

3705 

3706 # Apply delta and get result as list 

3707 chunks = apply_delta(chunks_bytes, delta) 

3708 

3709 if prev_offset is not None: 

3710 self.data._offset_cache[prev_offset] = base_type, chunks 

3711 return base_type, chunks 

3712 

3713 def entries( 

3714 self, progress: Optional[ProgressFn] = None 

3715 ) -> Iterator[PackIndexEntry]: 

3716 """Yield entries summarizing the contents of this pack. 

3717 

3718 Args: 

3719 progress: Progress function, called with current and total 

3720 object count. 

3721 Returns: iterator of tuples with (sha, offset, crc32) 

3722 """ 

3723 return self.data.iterentries( 

3724 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3725 ) 

3726 

3727 def sorted_entries( 

3728 self, progress: Optional[ProgressFn] = None 

3729 ) -> Iterator[PackIndexEntry]: 

3730 """Return entries in this pack, sorted by SHA. 

3731 

3732 Args: 

3733 progress: Progress function, called with current and total 

3734 object count 

3735 Returns: Iterator of tuples with (sha, offset, crc32) 

3736 """ 

3737 return iter( 

3738 self.data.sorted_entries( 

3739 progress=progress, resolve_ext_ref=self.resolve_ext_ref 

3740 ) 

3741 ) 

3742 

3743 def get_unpacked_object( 

3744 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True 

3745 ) -> UnpackedObject: 

3746 """Get the unpacked object for a sha. 

3747 

3748 Args: 

3749 sha: SHA of object to fetch 

3750 include_comp: Whether to include compression data in UnpackedObject 

3751 convert_ofs_delta: Whether to convert offset deltas to ref deltas 

3752 """ 

3753 offset = self.index.object_offset(sha) 

3754 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp) 

3755 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta: 

3756 assert isinstance(unpacked.delta_base, int) 

3757 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base) 

3758 unpacked.pack_type_num = REF_DELTA 

3759 return unpacked 

3760 

3761 

3762def extend_pack( 

3763 f: BinaryIO, 

3764 object_ids: set[ObjectID], 

3765 get_raw, 

3766 *, 

3767 compression_level=-1, 

3768 progress=None, 

3769) -> tuple[bytes, list]: 

3770 """Extend a pack file with more objects. 

3771 

3772 The caller should make sure that object_ids does not contain any objects 

3773 that are already in the pack 

3774 """ 

3775 # Update the header with the new number of objects. 

3776 f.seek(0) 

3777 _version, num_objects = read_pack_header(f.read) 

3778 

3779 if object_ids: 

3780 f.seek(0) 

3781 write_pack_header(f.write, num_objects + len(object_ids)) 

3782 

3783 # Must flush before reading (http://bugs.python.org/issue3207) 

3784 f.flush() 

3785 

3786 # Rescan the rest of the pack, computing the SHA with the new header. 

3787 new_sha = compute_file_sha(f, end_ofs=-20) 

3788 

3789 # Must reposition before writing (http://bugs.python.org/issue3207) 

3790 f.seek(0, os.SEEK_CUR) 

3791 

3792 extra_entries = [] 

3793 

3794 # Complete the pack. 

3795 for i, object_id in enumerate(object_ids): 

3796 if progress is not None: 

3797 progress( 

3798 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii") 

3799 ) 

3800 assert len(object_id) == 20 

3801 type_num, data = get_raw(object_id) 

3802 offset = f.tell() 

3803 crc32 = write_pack_object( 

3804 f.write, 

3805 type_num, 

3806 data, 

3807 sha=new_sha, 

3808 compression_level=compression_level, 

3809 ) 

3810 extra_entries.append((object_id, offset, crc32)) 

3811 pack_sha = new_sha.digest() 

3812 f.write(pack_sha) 

3813 return pack_sha, extra_entries 

3814 

3815 

3816try: 

3817 from dulwich._pack import ( # type: ignore 

3818 apply_delta, # type: ignore 

3819 bisect_find_sha, # type: ignore 

3820 ) 

3821except ImportError: 

3822 pass