Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pack.py -- For dealing with packed git objects.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Classes for dealing with packed git objects.
25A pack is a compact representation of a bunch of objects, stored
26using deltas where possible.
28They have two parts, the pack file, which stores the data, and an index
29that tells you where the data is.
31To find an object you look in all of the index files 'til you find a
32match for the object name. You then use the pointer got from this as
33a pointer in to the corresponding packfile.
34"""
36import binascii
37from collections import defaultdict, deque
38from contextlib import suppress
39from io import BytesIO, UnsupportedOperation
41try:
42 from cdifflib import CSequenceMatcher as SequenceMatcher
43except ModuleNotFoundError:
44 from difflib import SequenceMatcher
46import os
47import struct
48import sys
49import warnings
50import zlib
51from collections.abc import Iterable, Iterator, Sequence, Set
52from hashlib import sha1
53from itertools import chain
54from os import SEEK_CUR, SEEK_END
55from struct import unpack_from
56from types import TracebackType
57from typing import (
58 IO,
59 TYPE_CHECKING,
60 Any,
61 BinaryIO,
62 Callable,
63 Generic,
64 Optional,
65 Protocol,
66 TypeVar,
67 Union,
68)
70try:
71 import mmap
72except ImportError:
73 has_mmap = False
74else:
75 has_mmap = True
77if sys.version_info >= (3, 12):
78 from collections.abc import Buffer
79else:
80 Buffer = Union[bytes, bytearray, memoryview]
82if TYPE_CHECKING:
83 from _hashlib import HASH as HashObject
85 from .commit_graph import CommitGraph
87# For some reason the above try, except fails to set has_mmap = False for plan9
88if sys.platform == "Plan9":
89 has_mmap = False
91from . import replace_me
92from .errors import ApplyDeltaError, ChecksumMismatch
93from .file import GitFile, _GitFile
94from .lru_cache import LRUSizeCache
95from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex
97OFS_DELTA = 6
98REF_DELTA = 7
100DELTA_TYPES = (OFS_DELTA, REF_DELTA)
103DEFAULT_PACK_DELTA_WINDOW_SIZE = 10
105# Keep pack files under 16Mb in memory, otherwise write them out to disk
106PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024
108# Default pack index version to use when none is specified
109DEFAULT_PACK_INDEX_VERSION = 2
112OldUnpackedObject = Union[tuple[Union[bytes, int], list[bytes]], list[bytes]]
113ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]]
114ProgressFn = Callable[[int, str], None]
115PackHint = tuple[int, Optional[bytes]]
118class UnresolvedDeltas(Exception):
119 """Delta objects could not be resolved."""
121 def __init__(self, shas: list[bytes]) -> None:
122 """Initialize UnresolvedDeltas exception.
124 Args:
125 shas: List of SHA hashes for unresolved delta objects
126 """
127 self.shas = shas
130class ObjectContainer(Protocol):
131 """Protocol for objects that can contain git objects."""
133 def add_object(self, obj: ShaFile) -> None:
134 """Add a single object to this object store."""
136 def add_objects(
137 self,
138 objects: Sequence[tuple[ShaFile, Optional[str]]],
139 progress: Optional[Callable[..., None]] = None,
140 ) -> Optional["Pack"]:
141 """Add a set of objects to this object store.
143 Args:
144 objects: Iterable over a list of (object, path) tuples
145 progress: Progress callback for object insertion
146 Returns: Optional Pack object of the objects written.
147 """
149 def __contains__(self, sha1: bytes) -> bool:
150 """Check if a hex sha is present."""
152 def __getitem__(self, sha1: bytes) -> ShaFile:
153 """Retrieve an object."""
155 def get_commit_graph(self) -> Optional["CommitGraph"]:
156 """Get the commit graph for this object store.
158 Returns:
159 CommitGraph object if available, None otherwise
160 """
161 return None
164class PackedObjectContainer(ObjectContainer):
165 """Container for objects packed in a pack file."""
167 def get_unpacked_object(
168 self, sha1: bytes, *, include_comp: bool = False
169 ) -> "UnpackedObject":
170 """Get a raw unresolved object.
172 Args:
173 sha1: SHA-1 hash of the object
174 include_comp: Whether to include compressed data
176 Returns:
177 UnpackedObject instance
178 """
179 raise NotImplementedError(self.get_unpacked_object)
181 def iterobjects_subset(
182 self, shas: Iterable[bytes], *, allow_missing: bool = False
183 ) -> Iterator[ShaFile]:
184 """Iterate over a subset of objects.
186 Args:
187 shas: Iterable of object SHAs to retrieve
188 allow_missing: If True, skip missing objects
190 Returns:
191 Iterator of ShaFile objects
192 """
193 raise NotImplementedError(self.iterobjects_subset)
195 def iter_unpacked_subset(
196 self,
197 shas: Iterable[bytes],
198 *,
199 include_comp: bool = False,
200 allow_missing: bool = False,
201 convert_ofs_delta: bool = True,
202 ) -> Iterator["UnpackedObject"]:
203 """Iterate over unpacked objects from a subset of SHAs.
205 Args:
206 shas: Set of object SHAs to retrieve
207 include_comp: Include compressed data if True
208 allow_missing: If True, skip missing objects
209 convert_ofs_delta: If True, convert offset deltas to ref deltas
211 Returns:
212 Iterator of UnpackedObject instances
213 """
214 raise NotImplementedError(self.iter_unpacked_subset)
217class UnpackedObjectStream:
218 """Abstract base class for a stream of unpacked objects."""
220 def __iter__(self) -> Iterator["UnpackedObject"]:
221 """Iterate over unpacked objects."""
222 raise NotImplementedError(self.__iter__)
224 def __len__(self) -> int:
225 """Return the number of objects in the stream."""
226 raise NotImplementedError(self.__len__)
229def take_msb_bytes(
230 read: Callable[[int], bytes], crc32: Optional[int] = None
231) -> tuple[list[int], Optional[int]]:
232 """Read bytes marked with most significant bit.
234 Args:
235 read: Read function
236 crc32: Optional CRC32 checksum to update
238 Returns:
239 Tuple of (list of bytes read, updated CRC32 or None)
240 """
241 ret: list[int] = []
242 while len(ret) == 0 or ret[-1] & 0x80:
243 b = read(1)
244 if crc32 is not None:
245 crc32 = binascii.crc32(b, crc32)
246 ret.append(ord(b[:1]))
247 return ret, crc32
250class PackFileDisappeared(Exception):
251 """Raised when a pack file unexpectedly disappears."""
253 def __init__(self, obj: object) -> None:
254 """Initialize PackFileDisappeared exception.
256 Args:
257 obj: The object that triggered the exception
258 """
259 self.obj = obj
262class UnpackedObject:
263 """Class encapsulating an object unpacked from a pack file.
265 These objects should only be created from within unpack_object. Most
266 members start out as empty and are filled in at various points by
267 read_zlib_chunks, unpack_object, DeltaChainIterator, etc.
269 End users of this object should take care that the function they're getting
270 this object from is guaranteed to set the members they need.
271 """
273 __slots__ = [
274 "_sha", # Cached binary SHA.
275 "comp_chunks", # Compressed object chunks.
276 "crc32", # CRC32.
277 "decomp_chunks", # Decompressed object chunks.
278 "decomp_len", # Decompressed length of this object.
279 "delta_base", # Delta base offset or SHA.
280 "obj_chunks", # Decompressed and delta-resolved chunks.
281 "obj_type_num", # Type of this object.
282 "offset", # Offset in its pack.
283 "pack_type_num", # Type of this object in the pack (may be a delta).
284 ]
286 obj_type_num: Optional[int]
287 obj_chunks: Optional[list[bytes]]
288 delta_base: Union[None, bytes, int]
289 decomp_chunks: list[bytes]
290 comp_chunks: Optional[list[bytes]]
291 decomp_len: Optional[int]
292 crc32: Optional[int]
293 offset: Optional[int]
294 pack_type_num: int
295 _sha: Optional[bytes]
297 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be
298 # methods of this object.
299 def __init__(
300 self,
301 pack_type_num: int,
302 *,
303 delta_base: Union[None, bytes, int] = None,
304 decomp_len: Optional[int] = None,
305 crc32: Optional[int] = None,
306 sha: Optional[bytes] = None,
307 decomp_chunks: Optional[list[bytes]] = None,
308 offset: Optional[int] = None,
309 ) -> None:
310 """Initialize an UnpackedObject.
312 Args:
313 pack_type_num: Type number of this object in the pack
314 delta_base: Delta base (offset or SHA) if this is a delta object
315 decomp_len: Decompressed length of this object
316 crc32: CRC32 checksum
317 sha: SHA-1 hash of the object
318 decomp_chunks: Decompressed chunks
319 offset: Offset in the pack file
320 """
321 self.offset = offset
322 self._sha = sha
323 self.pack_type_num = pack_type_num
324 self.delta_base = delta_base
325 self.comp_chunks = None
326 self.decomp_chunks: list[bytes] = decomp_chunks or []
327 if decomp_chunks is not None and decomp_len is None:
328 self.decomp_len = sum(map(len, decomp_chunks))
329 else:
330 self.decomp_len = decomp_len
331 self.crc32 = crc32
333 if pack_type_num in DELTA_TYPES:
334 self.obj_type_num = None
335 self.obj_chunks = None
336 else:
337 self.obj_type_num = pack_type_num
338 self.obj_chunks = self.decomp_chunks
339 self.delta_base = delta_base
341 def sha(self) -> bytes:
342 """Return the binary SHA of this object."""
343 if self._sha is None:
344 assert self.obj_type_num is not None and self.obj_chunks is not None
345 self._sha = obj_sha(self.obj_type_num, self.obj_chunks)
346 return self._sha
348 def sha_file(self) -> ShaFile:
349 """Return a ShaFile from this object."""
350 assert self.obj_type_num is not None and self.obj_chunks is not None
351 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks)
353 # Only provided for backwards compatibility with code that expects either
354 # chunks or a delta tuple.
355 def _obj(self) -> OldUnpackedObject:
356 """Return the decompressed chunks, or (delta base, delta chunks)."""
357 if self.pack_type_num in DELTA_TYPES:
358 assert isinstance(self.delta_base, (bytes, int))
359 return (self.delta_base, self.decomp_chunks)
360 else:
361 return self.decomp_chunks
363 def __eq__(self, other: object) -> bool:
364 """Check equality with another UnpackedObject."""
365 if not isinstance(other, UnpackedObject):
366 return False
367 for slot in self.__slots__:
368 if getattr(self, slot) != getattr(other, slot):
369 return False
370 return True
372 def __ne__(self, other: object) -> bool:
373 """Check inequality with another UnpackedObject."""
374 return not (self == other)
376 def __repr__(self) -> str:
377 """Return string representation of this UnpackedObject."""
378 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__]
379 return "{}({})".format(self.__class__.__name__, ", ".join(data))
382_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance
385def read_zlib_chunks(
386 read_some: Callable[[int], bytes],
387 unpacked: UnpackedObject,
388 include_comp: bool = False,
389 buffer_size: int = _ZLIB_BUFSIZE,
390) -> bytes:
391 """Read zlib data from a buffer.
393 This function requires that the buffer have additional data following the
394 compressed data, which is guaranteed to be the case for git pack files.
396 Args:
397 read_some: Read function that returns at least one byte, but may
398 return less than the requested size.
399 unpacked: An UnpackedObject to write result data to. If its crc32
400 attr is not None, the CRC32 of the compressed bytes will be computed
401 using this starting CRC32.
402 After this function, will have the following attrs set:
403 * comp_chunks (if include_comp is True)
404 * decomp_chunks
405 * decomp_len
406 * crc32
407 include_comp: If True, include compressed data in the result.
408 buffer_size: Size of the read buffer.
409 Returns: Leftover unused data from the decompression.
411 Raises:
412 zlib.error: if a decompression error occurred.
413 """
414 if unpacked.decomp_len is None or unpacked.decomp_len <= -1:
415 raise ValueError("non-negative zlib data stream size expected")
416 decomp_obj = zlib.decompressobj()
418 comp_chunks = []
419 decomp_chunks = unpacked.decomp_chunks
420 decomp_len = 0
421 crc32 = unpacked.crc32
423 while True:
424 add = read_some(buffer_size)
425 if not add:
426 raise zlib.error("EOF before end of zlib stream")
427 comp_chunks.append(add)
428 decomp = decomp_obj.decompress(add)
429 decomp_len += len(decomp)
430 decomp_chunks.append(decomp)
431 unused = decomp_obj.unused_data
432 if unused:
433 left = len(unused)
434 if crc32 is not None:
435 crc32 = binascii.crc32(add[:-left], crc32)
436 if include_comp:
437 comp_chunks[-1] = add[:-left]
438 break
439 elif crc32 is not None:
440 crc32 = binascii.crc32(add, crc32)
441 if crc32 is not None:
442 crc32 &= 0xFFFFFFFF
444 if decomp_len != unpacked.decomp_len:
445 raise zlib.error("decompressed data does not match expected size")
447 unpacked.crc32 = crc32
448 if include_comp:
449 unpacked.comp_chunks = comp_chunks
450 return unused
453def iter_sha1(iter: Iterable[bytes]) -> bytes:
454 """Return the hexdigest of the SHA1 over a set of names.
456 Args:
457 iter: Iterator over string objects
458 Returns: 40-byte hex sha1 digest
459 """
460 sha = sha1()
461 for name in iter:
462 sha.update(name)
463 return sha.hexdigest().encode("ascii")
466def load_pack_index(path: Union[str, os.PathLike[str]]) -> "PackIndex":
467 """Load an index file by path.
469 Args:
470 path: Path to the index file
471 Returns: A PackIndex loaded from the given path
472 """
473 with GitFile(path, "rb") as f:
474 return load_pack_index_file(path, f)
477def _load_file_contents(
478 f: Union[IO[bytes], _GitFile], size: Optional[int] = None
479) -> tuple[Union[bytes, Any], int]:
480 """Load contents from a file, preferring mmap when possible.
482 Args:
483 f: File-like object to load
484 size: Expected size, or None to determine from file
485 Returns: Tuple of (contents, size)
486 """
487 try:
488 fd = f.fileno()
489 except (UnsupportedOperation, AttributeError):
490 fd = None
491 # Attempt to use mmap if possible
492 if fd is not None:
493 if size is None:
494 size = os.fstat(fd).st_size
495 if has_mmap:
496 try:
497 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ)
498 except (OSError, ValueError):
499 # Can't mmap - perhaps a socket or invalid file descriptor
500 pass
501 else:
502 return contents, size
503 contents_bytes = f.read()
504 size = len(contents_bytes)
505 return contents_bytes, size
508def load_pack_index_file(
509 path: Union[str, os.PathLike[str]], f: Union[IO[bytes], _GitFile]
510) -> "PackIndex":
511 """Load an index file from a file-like object.
513 Args:
514 path: Path for the index file
515 f: File-like object
516 Returns: A PackIndex loaded from the given file
517 """
518 contents, size = _load_file_contents(f)
519 if contents[:4] == b"\377tOc":
520 version = struct.unpack(b">L", contents[4:8])[0]
521 if version == 2:
522 return PackIndex2(path, file=f, contents=contents, size=size)
523 elif version == 3:
524 return PackIndex3(path, file=f, contents=contents, size=size)
525 else:
526 raise KeyError(f"Unknown pack index format {version}")
527 else:
528 return PackIndex1(path, file=f, contents=contents, size=size)
531def bisect_find_sha(
532 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes]
533) -> Optional[int]:
534 """Find a SHA in a data blob with sorted SHAs.
536 Args:
537 start: Start index of range to search
538 end: End index of range to search
539 sha: Sha to find
540 unpack_name: Callback to retrieve SHA by index
541 Returns: Index of the SHA, or None if it wasn't found
542 """
543 assert start <= end
544 while start <= end:
545 i = (start + end) // 2
546 file_sha = unpack_name(i)
547 if file_sha < sha:
548 start = i + 1
549 elif file_sha > sha:
550 end = i - 1
551 else:
552 return i
553 return None
556PackIndexEntry = tuple[bytes, int, Optional[int]]
559class PackIndex:
560 """An index in to a packfile.
562 Given a sha id of an object a pack index can tell you the location in the
563 packfile of that object if it has it.
564 """
566 # Default to SHA-1 for backward compatibility
567 hash_algorithm = 1
568 hash_size = 20
570 def __eq__(self, other: object) -> bool:
571 """Check equality with another PackIndex."""
572 if not isinstance(other, PackIndex):
573 return False
575 for (name1, _, _), (name2, _, _) in zip(
576 self.iterentries(), other.iterentries()
577 ):
578 if name1 != name2:
579 return False
580 return True
582 def __ne__(self, other: object) -> bool:
583 """Check if this pack index is not equal to another."""
584 return not self.__eq__(other)
586 def __len__(self) -> int:
587 """Return the number of entries in this pack index."""
588 raise NotImplementedError(self.__len__)
590 def __iter__(self) -> Iterator[bytes]:
591 """Iterate over the SHAs in this pack."""
592 return map(sha_to_hex, self._itersha())
594 def iterentries(self) -> Iterator[PackIndexEntry]:
595 """Iterate over the entries in this pack index.
597 Returns: iterator over tuples with object name, offset in packfile and
598 crc32 checksum.
599 """
600 raise NotImplementedError(self.iterentries)
602 def get_pack_checksum(self) -> Optional[bytes]:
603 """Return the SHA1 checksum stored for the corresponding packfile.
605 Returns: 20-byte binary digest, or None if not available
606 """
607 raise NotImplementedError(self.get_pack_checksum)
609 @replace_me(since="0.21.0", remove_in="0.23.0")
610 def object_index(self, sha: bytes) -> int:
611 """Return the index for the given SHA.
613 Args:
614 sha: SHA-1 hash
616 Returns:
617 Index position
618 """
619 return self.object_offset(sha)
621 def object_offset(self, sha: bytes) -> int:
622 """Return the offset in to the corresponding packfile for the object.
624 Given the name of an object it will return the offset that object
625 lives at within the corresponding pack file. If the pack file doesn't
626 have the object then None will be returned.
627 """
628 raise NotImplementedError(self.object_offset)
630 def object_sha1(self, index: int) -> bytes:
631 """Return the SHA1 corresponding to the index in the pack file."""
632 for name, offset, _crc32 in self.iterentries():
633 if offset == index:
634 return name
635 else:
636 raise KeyError(index)
638 def _object_offset(self, sha: bytes) -> int:
639 """See object_offset.
641 Args:
642 sha: A *binary* SHA string. (20 characters long)_
643 """
644 raise NotImplementedError(self._object_offset)
646 def objects_sha1(self) -> bytes:
647 """Return the hex SHA1 over all the shas of all objects in this pack.
649 Note: This is used for the filename of the pack.
650 """
651 return iter_sha1(self._itersha())
653 def _itersha(self) -> Iterator[bytes]:
654 """Yield all the SHA1's of the objects in the index, sorted."""
655 raise NotImplementedError(self._itersha)
657 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]:
658 """Iterate over all SHA1s with the given prefix.
660 Args:
661 prefix: Binary prefix to match
662 Returns: Iterator of matching SHA1s
663 """
664 # Default implementation for PackIndex classes that don't override
665 for sha, _, _ in self.iterentries():
666 if sha.startswith(prefix):
667 yield sha
669 def close(self) -> None:
670 """Close any open files."""
672 def check(self) -> None:
673 """Check the consistency of this pack index."""
676class MemoryPackIndex(PackIndex):
677 """Pack index that is stored entirely in memory."""
679 def __init__(
680 self,
681 entries: list[tuple[bytes, int, Optional[int]]],
682 pack_checksum: Optional[bytes] = None,
683 ) -> None:
684 """Create a new MemoryPackIndex.
686 Args:
687 entries: Sequence of name, idx, crc32 (sorted)
688 pack_checksum: Optional pack checksum
689 """
690 self._by_sha = {}
691 self._by_offset = {}
692 for name, offset, _crc32 in entries:
693 self._by_sha[name] = offset
694 self._by_offset[offset] = name
695 self._entries = entries
696 self._pack_checksum = pack_checksum
698 def get_pack_checksum(self) -> Optional[bytes]:
699 """Return the SHA checksum stored for the corresponding packfile."""
700 return self._pack_checksum
702 def __len__(self) -> int:
703 """Return the number of entries in this pack index."""
704 return len(self._entries)
706 def object_offset(self, sha: bytes) -> int:
707 """Return the offset for the given SHA.
709 Args:
710 sha: SHA to look up (binary or hex)
711 Returns: Offset in the pack file
712 """
713 if len(sha) == 40:
714 sha = hex_to_sha(sha)
715 return self._by_sha[sha]
717 def object_sha1(self, offset: int) -> bytes:
718 """Return the SHA1 for the object at the given offset."""
719 return self._by_offset[offset]
721 def _itersha(self) -> Iterator[bytes]:
722 """Iterate over all SHA1s in the index."""
723 return iter(self._by_sha)
725 def iterentries(self) -> Iterator[PackIndexEntry]:
726 """Iterate over all index entries."""
727 return iter(self._entries)
729 @classmethod
730 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex":
731 """Create a MemoryPackIndex from a PackData object."""
732 return MemoryPackIndex(
733 list(pack_data.sorted_entries()), pack_data.get_stored_checksum()
734 )
736 @classmethod
737 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex":
738 """Create a copy of another PackIndex in memory."""
739 return cls(list(other_index.iterentries()), other_index.get_pack_checksum())
742class FilePackIndex(PackIndex):
743 """Pack index that is based on a file.
745 To do the loop it opens the file, and indexes first 256 4 byte groups
746 with the first byte of the sha id. The value in the four byte group indexed
747 is the end of the group that shares the same starting byte. Subtract one
748 from the starting byte and index again to find the start of the group.
749 The values are sorted by sha id within the group, so do the math to find
750 the start and end offset and then bisect in to find if the value is
751 present.
752 """
754 _fan_out_table: list[int]
755 _file: Union[IO[bytes], _GitFile]
757 def __init__(
758 self,
759 filename: Union[str, os.PathLike[str]],
760 file: Optional[Union[IO[bytes], _GitFile]] = None,
761 contents: Optional[Union[bytes, "mmap.mmap"]] = None,
762 size: Optional[int] = None,
763 ) -> None:
764 """Create a pack index object.
766 Provide it with the name of the index file to consider, and it will map
767 it whenever required.
768 """
769 self._filename = filename
770 # Take the size now, so it can be checked each time we map the file to
771 # ensure that it hasn't changed.
772 if file is None:
773 self._file = GitFile(filename, "rb")
774 else:
775 self._file = file
776 if contents is None:
777 self._contents, self._size = _load_file_contents(self._file, size)
778 else:
779 self._contents = contents
780 self._size = size if size is not None else len(contents)
782 @property
783 def path(self) -> str:
784 """Return the path to this index file."""
785 return os.fspath(self._filename)
787 def __eq__(self, other: object) -> bool:
788 """Check equality with another FilePackIndex."""
789 # Quick optimization:
790 if (
791 isinstance(other, FilePackIndex)
792 and self._fan_out_table != other._fan_out_table
793 ):
794 return False
796 return super().__eq__(other)
798 def close(self) -> None:
799 """Close the underlying file and any mmap."""
800 self._file.close()
801 close_fn = getattr(self._contents, "close", None)
802 if close_fn is not None:
803 close_fn()
805 def __len__(self) -> int:
806 """Return the number of entries in this pack index."""
807 return self._fan_out_table[-1]
809 def _unpack_entry(self, i: int) -> PackIndexEntry:
810 """Unpack the i-th entry in the index file.
812 Returns: Tuple with object name (SHA), offset in pack file and CRC32
813 checksum (if known).
814 """
815 raise NotImplementedError(self._unpack_entry)
817 def _unpack_name(self, i: int) -> bytes:
818 """Unpack the i-th name from the index file."""
819 raise NotImplementedError(self._unpack_name)
821 def _unpack_offset(self, i: int) -> int:
822 """Unpack the i-th object offset from the index file."""
823 raise NotImplementedError(self._unpack_offset)
825 def _unpack_crc32_checksum(self, i: int) -> Optional[int]:
826 """Unpack the crc32 checksum for the ith object from the index file."""
827 raise NotImplementedError(self._unpack_crc32_checksum)
829 def _itersha(self) -> Iterator[bytes]:
830 """Iterate over all SHA1s in the index."""
831 for i in range(len(self)):
832 yield self._unpack_name(i)
834 def iterentries(self) -> Iterator[PackIndexEntry]:
835 """Iterate over the entries in this pack index.
837 Returns: iterator over tuples with object name, offset in packfile and
838 crc32 checksum.
839 """
840 for i in range(len(self)):
841 yield self._unpack_entry(i)
843 def _read_fan_out_table(self, start_offset: int) -> list[int]:
844 """Read the fan-out table from the index.
846 The fan-out table contains 256 entries mapping first byte values
847 to the number of objects with SHA1s less than or equal to that byte.
849 Args:
850 start_offset: Offset in the file where the fan-out table starts
851 Returns: List of 256 integers
852 """
853 ret = []
854 for i in range(0x100):
855 fanout_entry = self._contents[
856 start_offset + i * 4 : start_offset + (i + 1) * 4
857 ]
858 ret.append(struct.unpack(">L", fanout_entry)[0])
859 return ret
861 def check(self) -> None:
862 """Check that the stored checksum matches the actual checksum."""
863 actual = self.calculate_checksum()
864 stored = self.get_stored_checksum()
865 if actual != stored:
866 raise ChecksumMismatch(stored, actual)
868 def calculate_checksum(self) -> bytes:
869 """Calculate the SHA1 checksum over this pack index.
871 Returns: This is a 20-byte binary digest
872 """
873 return sha1(self._contents[:-20]).digest()
875 def get_pack_checksum(self) -> bytes:
876 """Return the SHA1 checksum stored for the corresponding packfile.
878 Returns: 20-byte binary digest
879 """
880 return bytes(self._contents[-40:-20])
882 def get_stored_checksum(self) -> bytes:
883 """Return the SHA1 checksum stored for this index.
885 Returns: 20-byte binary digest
886 """
887 return bytes(self._contents[-20:])
889 def object_offset(self, sha: bytes) -> int:
890 """Return the offset in to the corresponding packfile for the object.
892 Given the name of an object it will return the offset that object
893 lives at within the corresponding pack file. If the pack file doesn't
894 have the object then None will be returned.
895 """
896 if len(sha) == 40:
897 sha = hex_to_sha(sha)
898 try:
899 return self._object_offset(sha)
900 except ValueError as exc:
901 closed = getattr(self._contents, "closed", None)
902 if closed in (None, True):
903 raise PackFileDisappeared(self) from exc
904 raise
906 def _object_offset(self, sha: bytes) -> int:
907 """See object_offset.
909 Args:
910 sha: A *binary* SHA string. (20 characters long)_
911 """
912 assert len(sha) == 20
913 idx = ord(sha[:1])
914 if idx == 0:
915 start = 0
916 else:
917 start = self._fan_out_table[idx - 1]
918 end = self._fan_out_table[idx]
919 i = bisect_find_sha(start, end, sha, self._unpack_name)
920 if i is None:
921 raise KeyError(sha)
922 return self._unpack_offset(i)
924 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]:
925 """Iterate over all SHA1s with the given prefix."""
926 start = ord(prefix[:1])
927 if start == 0:
928 start = 0
929 else:
930 start = self._fan_out_table[start - 1]
931 end = ord(prefix[:1]) + 1
932 if end == 0x100:
933 end = len(self)
934 else:
935 end = self._fan_out_table[end]
936 assert start <= end
937 started = False
938 for i in range(start, end):
939 name: bytes = self._unpack_name(i)
940 if name.startswith(prefix):
941 yield name
942 started = True
943 elif started:
944 break
947class PackIndex1(FilePackIndex):
948 """Version 1 Pack Index file."""
950 def __init__(
951 self,
952 filename: Union[str, os.PathLike[str]],
953 file: Optional[Union[IO[bytes], _GitFile]] = None,
954 contents: Optional[bytes] = None,
955 size: Optional[int] = None,
956 ) -> None:
957 """Initialize a version 1 pack index.
959 Args:
960 filename: Path to the index file
961 file: Optional file object
962 contents: Optional mmap'd contents
963 size: Optional size of the index
964 """
965 super().__init__(filename, file, contents, size)
966 self.version = 1
967 self._fan_out_table = self._read_fan_out_table(0)
969 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]:
970 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24))
971 return (name, offset, None)
973 def _unpack_name(self, i: int) -> bytes:
974 offset = (0x100 * 4) + (i * 24) + 4
975 return self._contents[offset : offset + 20]
977 def _unpack_offset(self, i: int) -> int:
978 offset = (0x100 * 4) + (i * 24)
979 result = unpack_from(">L", self._contents, offset)[0]
980 assert isinstance(result, int)
981 return result
983 def _unpack_crc32_checksum(self, i: int) -> None:
984 # Not stored in v1 index files
985 return None
988class PackIndex2(FilePackIndex):
989 """Version 2 Pack Index file."""
991 def __init__(
992 self,
993 filename: Union[str, os.PathLike[str]],
994 file: Optional[Union[IO[bytes], _GitFile]] = None,
995 contents: Optional[bytes] = None,
996 size: Optional[int] = None,
997 ) -> None:
998 """Initialize a version 2 pack index.
1000 Args:
1001 filename: Path to the index file
1002 file: Optional file object
1003 contents: Optional mmap'd contents
1004 size: Optional size of the index
1005 """
1006 super().__init__(filename, file, contents, size)
1007 if self._contents[:4] != b"\377tOc":
1008 raise AssertionError("Not a v2 pack index file")
1009 (self.version,) = unpack_from(b">L", self._contents, 4)
1010 if self.version != 2:
1011 raise AssertionError(f"Version was {self.version}")
1012 self._fan_out_table = self._read_fan_out_table(8)
1013 self._name_table_offset = 8 + 0x100 * 4
1014 self._crc32_table_offset = self._name_table_offset + 20 * len(self)
1015 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self)
1016 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len(
1017 self
1018 )
1020 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]:
1021 return (
1022 self._unpack_name(i),
1023 self._unpack_offset(i),
1024 self._unpack_crc32_checksum(i),
1025 )
1027 def _unpack_name(self, i: int) -> bytes:
1028 offset = self._name_table_offset + i * 20
1029 return self._contents[offset : offset + 20]
1031 def _unpack_offset(self, i: int) -> int:
1032 offset_pos = self._pack_offset_table_offset + i * 4
1033 offset = unpack_from(">L", self._contents, offset_pos)[0]
1034 assert isinstance(offset, int)
1035 if offset & (2**31):
1036 large_offset_pos = (
1037 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8
1038 )
1039 offset = unpack_from(">Q", self._contents, large_offset_pos)[0]
1040 assert isinstance(offset, int)
1041 return offset
1043 def _unpack_crc32_checksum(self, i: int) -> int:
1044 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
1045 assert isinstance(result, int)
1046 return result
1049class PackIndex3(FilePackIndex):
1050 """Version 3 Pack Index file.
1052 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes).
1053 """
1055 def __init__(
1056 self,
1057 filename: Union[str, os.PathLike[str]],
1058 file: Optional[Union[IO[bytes], _GitFile]] = None,
1059 contents: Optional[bytes] = None,
1060 size: Optional[int] = None,
1061 ) -> None:
1062 """Initialize a version 3 pack index.
1064 Args:
1065 filename: Path to the index file
1066 file: Optional file object
1067 contents: Optional mmap'd contents
1068 size: Optional size of the index
1069 """
1070 super().__init__(filename, file, contents, size)
1071 if self._contents[:4] != b"\377tOc":
1072 raise AssertionError("Not a v3 pack index file")
1073 (self.version,) = unpack_from(b">L", self._contents, 4)
1074 if self.version != 3:
1075 raise AssertionError(f"Version was {self.version}")
1077 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256)
1078 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8)
1079 if self.hash_algorithm == 1:
1080 self.hash_size = 20 # SHA-1
1081 elif self.hash_algorithm == 2:
1082 self.hash_size = 32 # SHA-256
1083 else:
1084 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}")
1086 # Read length of shortened object names
1087 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12)
1089 # Calculate offsets based on variable hash size
1090 self._fan_out_table = self._read_fan_out_table(
1091 16
1092 ) # After header (4 + 4 + 4 + 4)
1093 self._name_table_offset = 16 + 0x100 * 4
1094 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self)
1095 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self)
1096 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len(
1097 self
1098 )
1100 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]:
1101 return (
1102 self._unpack_name(i),
1103 self._unpack_offset(i),
1104 self._unpack_crc32_checksum(i),
1105 )
1107 def _unpack_name(self, i: int) -> bytes:
1108 offset = self._name_table_offset + i * self.hash_size
1109 return self._contents[offset : offset + self.hash_size]
1111 def _unpack_offset(self, i: int) -> int:
1112 offset_pos = self._pack_offset_table_offset + i * 4
1113 offset = unpack_from(">L", self._contents, offset_pos)[0]
1114 assert isinstance(offset, int)
1115 if offset & (2**31):
1116 large_offset_pos = (
1117 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8
1118 )
1119 offset = unpack_from(">Q", self._contents, large_offset_pos)[0]
1120 assert isinstance(offset, int)
1121 return offset
1123 def _unpack_crc32_checksum(self, i: int) -> int:
1124 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
1125 assert isinstance(result, int)
1126 return result
1129def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]:
1130 """Read the header of a pack file.
1132 Args:
1133 read: Read function
1134 Returns: Tuple of (pack version, number of objects). If no data is
1135 available to read, returns (None, None).
1136 """
1137 header = read(12)
1138 if not header:
1139 raise AssertionError("file too short to contain pack")
1140 if header[:4] != b"PACK":
1141 raise AssertionError(f"Invalid pack header {header!r}")
1142 (version,) = unpack_from(b">L", header, 4)
1143 if version not in (2, 3):
1144 raise AssertionError(f"Version was {version}")
1145 (num_objects,) = unpack_from(b">L", header, 8)
1146 return (version, num_objects)
1149def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int:
1150 """Get the total length of a sequence of chunks.
1152 Args:
1153 chunks: Either a single bytes object or an iterable of bytes
1154 Returns: Total length in bytes
1155 """
1156 if isinstance(chunks, bytes):
1157 return len(chunks)
1158 else:
1159 return sum(map(len, chunks))
1162def unpack_object(
1163 read_all: Callable[[int], bytes],
1164 read_some: Optional[Callable[[int], bytes]] = None,
1165 compute_crc32: bool = False,
1166 include_comp: bool = False,
1167 zlib_bufsize: int = _ZLIB_BUFSIZE,
1168) -> tuple[UnpackedObject, bytes]:
1169 """Unpack a Git object.
1171 Args:
1172 read_all: Read function that blocks until the number of requested
1173 bytes are read.
1174 read_some: Read function that returns at least one byte, but may not
1175 return the number of bytes requested.
1176 compute_crc32: If True, compute the CRC32 of the compressed data. If
1177 False, the returned CRC32 will be None.
1178 include_comp: If True, include compressed data in the result.
1179 zlib_bufsize: An optional buffer size for zlib operations.
1180 Returns: A tuple of (unpacked, unused), where unused is the unused data
1181 leftover from decompression, and unpacked in an UnpackedObject with
1182 the following attrs set:
1184 * obj_chunks (for non-delta types)
1185 * pack_type_num
1186 * delta_base (for delta types)
1187 * comp_chunks (if include_comp is True)
1188 * decomp_chunks
1189 * decomp_len
1190 * crc32 (if compute_crc32 is True)
1191 """
1192 if read_some is None:
1193 read_some = read_all
1194 if compute_crc32:
1195 crc32 = 0
1196 else:
1197 crc32 = None
1199 raw, crc32 = take_msb_bytes(read_all, crc32=crc32)
1200 type_num = (raw[0] >> 4) & 0x07
1201 size = raw[0] & 0x0F
1202 for i, byte in enumerate(raw[1:]):
1203 size += (byte & 0x7F) << ((i * 7) + 4)
1205 delta_base: Union[int, bytes, None]
1206 raw_base = len(raw)
1207 if type_num == OFS_DELTA:
1208 raw, crc32 = take_msb_bytes(read_all, crc32=crc32)
1209 raw_base += len(raw)
1210 if raw[-1] & 0x80:
1211 raise AssertionError
1212 delta_base_offset = raw[0] & 0x7F
1213 for byte in raw[1:]:
1214 delta_base_offset += 1
1215 delta_base_offset <<= 7
1216 delta_base_offset += byte & 0x7F
1217 delta_base = delta_base_offset
1218 elif type_num == REF_DELTA:
1219 delta_base_obj = read_all(20)
1220 if crc32 is not None:
1221 crc32 = binascii.crc32(delta_base_obj, crc32)
1222 delta_base = delta_base_obj
1223 raw_base += 20
1224 else:
1225 delta_base = None
1227 unpacked = UnpackedObject(
1228 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32
1229 )
1230 unused = read_zlib_chunks(
1231 read_some,
1232 unpacked,
1233 buffer_size=zlib_bufsize,
1234 include_comp=include_comp,
1235 )
1236 return unpacked, unused
1239def _compute_object_size(value: tuple[int, Any]) -> int:
1240 """Compute the size of a unresolved object for use with LRUSizeCache."""
1241 (num, obj) = value
1242 if num in DELTA_TYPES:
1243 return chunks_length(obj[1])
1244 return chunks_length(obj)
1247class PackStreamReader:
1248 """Class to read a pack stream.
1250 The pack is read from a ReceivableProtocol using read() or recv() as
1251 appropriate.
1252 """
1254 def __init__(
1255 self,
1256 read_all: Callable[[int], bytes],
1257 read_some: Optional[Callable[[int], bytes]] = None,
1258 zlib_bufsize: int = _ZLIB_BUFSIZE,
1259 ) -> None:
1260 """Initialize pack stream reader.
1262 Args:
1263 read_all: Function to read all requested bytes
1264 read_some: Function to read some bytes (optional)
1265 zlib_bufsize: Buffer size for zlib decompression
1266 """
1267 self.read_all = read_all
1268 if read_some is None:
1269 self.read_some = read_all
1270 else:
1271 self.read_some = read_some
1272 self.sha = sha1()
1273 self._offset = 0
1274 self._rbuf = BytesIO()
1275 # trailer is a deque to avoid memory allocation on small reads
1276 self._trailer: deque[int] = deque()
1277 self._zlib_bufsize = zlib_bufsize
1279 def _read(self, read: Callable[[int], bytes], size: int) -> bytes:
1280 """Read up to size bytes using the given callback.
1282 As a side effect, update the verifier's hash (excluding the last 20
1283 bytes read).
1285 Args:
1286 read: The read callback to read from.
1287 size: The maximum number of bytes to read; the particular
1288 behavior is callback-specific.
1289 Returns: Bytes read
1290 """
1291 data = read(size)
1293 # maintain a trailer of the last 20 bytes we've read
1294 n = len(data)
1295 self._offset += n
1296 tn = len(self._trailer)
1297 if n >= 20:
1298 to_pop = tn
1299 to_add = 20
1300 else:
1301 to_pop = max(n + tn - 20, 0)
1302 to_add = n
1303 self.sha.update(
1304 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)]))
1305 )
1306 self._trailer.extend(data[-to_add:])
1308 # hash everything but the trailer
1309 self.sha.update(data[:-to_add])
1310 return data
1312 def _buf_len(self) -> int:
1313 buf = self._rbuf
1314 start = buf.tell()
1315 buf.seek(0, SEEK_END)
1316 end = buf.tell()
1317 buf.seek(start)
1318 return end - start
1320 @property
1321 def offset(self) -> int:
1322 """Return current offset in the stream."""
1323 return self._offset - self._buf_len()
1325 def read(self, size: int) -> bytes:
1326 """Read, blocking until size bytes are read."""
1327 buf_len = self._buf_len()
1328 if buf_len >= size:
1329 return self._rbuf.read(size)
1330 buf_data = self._rbuf.read()
1331 self._rbuf = BytesIO()
1332 return buf_data + self._read(self.read_all, size - buf_len)
1334 def recv(self, size: int) -> bytes:
1335 """Read up to size bytes, blocking until one byte is read."""
1336 buf_len = self._buf_len()
1337 if buf_len:
1338 data = self._rbuf.read(size)
1339 if size >= buf_len:
1340 self._rbuf = BytesIO()
1341 return data
1342 return self._read(self.read_some, size)
1344 def __len__(self) -> int:
1345 """Return the number of objects in this pack."""
1346 return self._num_objects
1348 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]:
1349 """Read the objects in this pack file.
1351 Args:
1352 compute_crc32: If True, compute the CRC32 of the compressed
1353 data. If False, the returned CRC32 will be None.
1354 Returns: Iterator over UnpackedObjects with the following members set:
1355 offset
1356 obj_type_num
1357 obj_chunks (for non-delta types)
1358 delta_base (for delta types)
1359 decomp_chunks
1360 decomp_len
1361 crc32 (if compute_crc32 is True)
1363 Raises:
1364 ChecksumMismatch: if the checksum of the pack contents does not
1365 match the checksum in the pack trailer.
1366 zlib.error: if an error occurred during zlib decompression.
1367 IOError: if an error occurred writing to the output file.
1368 """
1369 _pack_version, self._num_objects = read_pack_header(self.read)
1371 for _ in range(self._num_objects):
1372 offset = self.offset
1373 unpacked, unused = unpack_object(
1374 self.read,
1375 read_some=self.recv,
1376 compute_crc32=compute_crc32,
1377 zlib_bufsize=self._zlib_bufsize,
1378 )
1379 unpacked.offset = offset
1381 # prepend any unused data to current read buffer
1382 buf = BytesIO()
1383 buf.write(unused)
1384 buf.write(self._rbuf.read())
1385 buf.seek(0)
1386 self._rbuf = buf
1388 yield unpacked
1390 if self._buf_len() < 20:
1391 # If the read buffer is full, then the last read() got the whole
1392 # trailer off the wire. If not, it means there is still some of the
1393 # trailer to read. We need to read() all 20 bytes; N come from the
1394 # read buffer and (20 - N) come from the wire.
1395 self.read(20)
1397 pack_sha = bytearray(self._trailer)
1398 if pack_sha != self.sha.digest():
1399 raise ChecksumMismatch(sha_to_hex(bytes(pack_sha)), self.sha.hexdigest())
1402class PackStreamCopier(PackStreamReader):
1403 """Class to verify a pack stream as it is being read.
1405 The pack is read from a ReceivableProtocol using read() or recv() as
1406 appropriate and written out to the given file-like object.
1407 """
1409 def __init__(
1410 self,
1411 read_all: Callable[[int], bytes],
1412 read_some: Optional[Callable[[int], bytes]],
1413 outfile: IO[bytes],
1414 delta_iter: Optional["DeltaChainIterator[UnpackedObject]"] = None,
1415 ) -> None:
1416 """Initialize the copier.
1418 Args:
1419 read_all: Read function that blocks until the number of
1420 requested bytes are read.
1421 read_some: Read function that returns at least one byte, but may
1422 not return the number of bytes requested.
1423 outfile: File-like object to write output through.
1424 delta_iter: Optional DeltaChainIterator to record deltas as we
1425 read them.
1426 """
1427 super().__init__(read_all, read_some=read_some)
1428 self.outfile = outfile
1429 self._delta_iter = delta_iter
1431 def _read(self, read: Callable[[int], bytes], size: int) -> bytes:
1432 """Read data from the read callback and write it to the file."""
1433 data = super()._read(read, size)
1434 self.outfile.write(data)
1435 return data
1437 def verify(self, progress: Optional[Callable[..., None]] = None) -> None:
1438 """Verify a pack stream and write it to the output file.
1440 See PackStreamReader.iterobjects for a list of exceptions this may
1441 throw.
1442 """
1443 i = 0 # default count of entries if read_objects() is empty
1444 for i, unpacked in enumerate(self.read_objects()):
1445 if self._delta_iter:
1446 self._delta_iter.record(unpacked)
1447 if progress is not None:
1448 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii"))
1449 if progress is not None:
1450 progress(f"copied {i} pack entries\n".encode("ascii"))
1453def obj_sha(type: int, chunks: Union[bytes, Iterable[bytes]]) -> bytes:
1454 """Compute the SHA for a numeric type and object chunks."""
1455 sha = sha1()
1456 sha.update(object_header(type, chunks_length(chunks)))
1457 if isinstance(chunks, bytes):
1458 sha.update(chunks)
1459 else:
1460 for chunk in chunks:
1461 sha.update(chunk)
1462 return sha.digest()
1465def compute_file_sha(
1466 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16
1467) -> "HashObject":
1468 """Hash a portion of a file into a new SHA.
1470 Args:
1471 f: A file-like object to read from that supports seek().
1472 start_ofs: The offset in the file to start reading at.
1473 end_ofs: The offset in the file to end reading at, relative to the
1474 end of the file.
1475 buffer_size: A buffer size for reading.
1476 Returns: A new SHA object updated with data read from the file.
1477 """
1478 sha = sha1()
1479 f.seek(0, SEEK_END)
1480 length = f.tell()
1481 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length:
1482 raise AssertionError(
1483 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}"
1484 )
1485 todo = length + end_ofs - start_ofs
1486 f.seek(start_ofs)
1487 while todo:
1488 data = f.read(min(todo, buffer_size))
1489 sha.update(data)
1490 todo -= len(data)
1491 return sha
1494class PackData:
1495 """The data contained in a packfile.
1497 Pack files can be accessed both sequentially for exploding a pack, and
1498 directly with the help of an index to retrieve a specific object.
1500 The objects within are either complete or a delta against another.
1502 The header is variable length. If the MSB of each byte is set then it
1503 indicates that the subsequent byte is still part of the header.
1504 For the first byte the next MS bits are the type, which tells you the type
1505 of object, and whether it is a delta. The LS byte is the lowest bits of the
1506 size. For each subsequent byte the LS 7 bits are the next MS bits of the
1507 size, i.e. the last byte of the header contains the MS bits of the size.
1509 For the complete objects the data is stored as zlib deflated data.
1510 The size in the header is the uncompressed object size, so to uncompress
1511 you need to just keep feeding data to zlib until you get an object back,
1512 or it errors on bad data. This is done here by just giving the complete
1513 buffer from the start of the deflated object on. This is bad, but until I
1514 get mmap sorted out it will have to do.
1516 Currently there are no integrity checks done. Also no attempt is made to
1517 try and detect the delta case, or a request for an object at the wrong
1518 position. It will all just throw a zlib or KeyError.
1519 """
1521 def __init__(
1522 self,
1523 filename: Union[str, os.PathLike[str]],
1524 file: Optional[IO[bytes]] = None,
1525 size: Optional[int] = None,
1526 *,
1527 delta_window_size: Optional[int] = None,
1528 window_memory: Optional[int] = None,
1529 delta_cache_size: Optional[int] = None,
1530 depth: Optional[int] = None,
1531 threads: Optional[int] = None,
1532 big_file_threshold: Optional[int] = None,
1533 ) -> None:
1534 """Create a PackData object representing the pack in the given filename.
1536 The file must exist and stay readable until the object is disposed of.
1537 It must also stay the same size. It will be mapped whenever needed.
1539 Currently there is a restriction on the size of the pack as the python
1540 mmap implementation is flawed.
1541 """
1542 self._filename = filename
1543 self._size = size
1544 self._header_size = 12
1545 self.delta_window_size = delta_window_size
1546 self.window_memory = window_memory
1547 self.delta_cache_size = delta_cache_size
1548 self.depth = depth
1549 self.threads = threads
1550 self.big_file_threshold = big_file_threshold
1551 self._file: IO[bytes]
1553 if file is None:
1554 self._file = GitFile(self._filename, "rb")
1555 else:
1556 self._file = file
1557 (_version, self._num_objects) = read_pack_header(self._file.read)
1559 # Use delta_cache_size config if available, otherwise default
1560 cache_size = delta_cache_size or (1024 * 1024 * 20)
1561 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]](
1562 cache_size, compute_size=_compute_object_size
1563 )
1565 @property
1566 def filename(self) -> str:
1567 """Get the filename of the pack file.
1569 Returns:
1570 Base filename without directory path
1571 """
1572 return os.path.basename(self._filename)
1574 @property
1575 def path(self) -> Union[str, os.PathLike[str]]:
1576 """Get the full path of the pack file.
1578 Returns:
1579 Full path to the pack file
1580 """
1581 return self._filename
1583 @classmethod
1584 def from_file(cls, file: IO[bytes], size: Optional[int] = None) -> "PackData":
1585 """Create a PackData object from an open file.
1587 Args:
1588 file: Open file object
1589 size: Optional file size
1591 Returns:
1592 PackData instance
1593 """
1594 return cls(str(file), file=file, size=size)
1596 @classmethod
1597 def from_path(cls, path: Union[str, os.PathLike[str]]) -> "PackData":
1598 """Create a PackData object from a file path.
1600 Args:
1601 path: Path to the pack file
1603 Returns:
1604 PackData instance
1605 """
1606 return cls(filename=path)
1608 def close(self) -> None:
1609 """Close the underlying pack file."""
1610 self._file.close()
1612 def __enter__(self) -> "PackData":
1613 """Enter context manager."""
1614 return self
1616 def __exit__(
1617 self,
1618 exc_type: Optional[type],
1619 exc_val: Optional[BaseException],
1620 exc_tb: Optional[TracebackType],
1621 ) -> None:
1622 """Exit context manager."""
1623 self.close()
1625 def __eq__(self, other: object) -> bool:
1626 """Check equality with another object."""
1627 if isinstance(other, PackData):
1628 return self.get_stored_checksum() == other.get_stored_checksum()
1629 return False
1631 def _get_size(self) -> int:
1632 if self._size is not None:
1633 return self._size
1634 self._size = os.path.getsize(self._filename)
1635 if self._size < self._header_size:
1636 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})"
1637 raise AssertionError(errmsg)
1638 return self._size
1640 def __len__(self) -> int:
1641 """Returns the number of objects in this pack."""
1642 return self._num_objects
1644 def calculate_checksum(self) -> bytes:
1645 """Calculate the checksum for this pack.
1647 Returns: 20-byte binary SHA1 digest
1648 """
1649 return compute_file_sha(self._file, end_ofs=-20).digest()
1651 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]:
1652 """Iterate over unpacked objects in the pack."""
1653 self._file.seek(self._header_size)
1655 if self._num_objects is None:
1656 return
1658 for _ in range(self._num_objects):
1659 offset = self._file.tell()
1660 unpacked, unused = unpack_object(
1661 self._file.read, compute_crc32=False, include_comp=include_comp
1662 )
1663 unpacked.offset = offset
1664 yield unpacked
1665 # Back up over unused data.
1666 self._file.seek(-len(unused), SEEK_CUR)
1668 def iterentries(
1669 self,
1670 progress: Optional[Callable[[int, int], None]] = None,
1671 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1672 ) -> Iterator[tuple[bytes, int, Optional[int]]]:
1673 """Yield entries summarizing the contents of this pack.
1675 Args:
1676 progress: Progress function, called with current and total
1677 object count.
1678 resolve_ext_ref: Optional function to resolve external references
1679 Returns: iterator of tuples with (sha, offset, crc32)
1680 """
1681 num_objects = self._num_objects
1682 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref)
1683 for i, result in enumerate(indexer):
1684 if progress is not None:
1685 progress(i, num_objects)
1686 yield result
1688 def sorted_entries(
1689 self,
1690 progress: Optional[ProgressFn] = None,
1691 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1692 ) -> list[tuple[bytes, int, int]]:
1693 """Return entries in this pack, sorted by SHA.
1695 Args:
1696 progress: Progress function, called with current and total
1697 object count
1698 resolve_ext_ref: Optional function to resolve external references
1699 Returns: Iterator of tuples with (sha, offset, crc32)
1700 """
1701 return sorted(
1702 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore
1703 )
1705 def create_index_v1(
1706 self,
1707 filename: str,
1708 progress: Optional[Callable[..., None]] = None,
1709 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1710 ) -> bytes:
1711 """Create a version 1 file for this data file.
1713 Args:
1714 filename: Index filename.
1715 progress: Progress report function
1716 resolve_ext_ref: Optional function to resolve external references
1717 Returns: Checksum of index file
1718 """
1719 entries = self.sorted_entries(
1720 progress=progress, resolve_ext_ref=resolve_ext_ref
1721 )
1722 checksum = self.calculate_checksum()
1723 with GitFile(filename, "wb") as f:
1724 write_pack_index_v1(
1725 f,
1726 entries,
1727 checksum,
1728 )
1729 return checksum
1731 def create_index_v2(
1732 self,
1733 filename: str,
1734 progress: Optional[Callable[..., None]] = None,
1735 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1736 ) -> bytes:
1737 """Create a version 2 index file for this data file.
1739 Args:
1740 filename: Index filename.
1741 progress: Progress report function
1742 resolve_ext_ref: Optional function to resolve external references
1743 Returns: Checksum of index file
1744 """
1745 entries = self.sorted_entries(
1746 progress=progress, resolve_ext_ref=resolve_ext_ref
1747 )
1748 with GitFile(filename, "wb") as f:
1749 return write_pack_index_v2(f, entries, self.calculate_checksum())
1751 def create_index_v3(
1752 self,
1753 filename: str,
1754 progress: Optional[Callable[..., None]] = None,
1755 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1756 hash_algorithm: int = 1,
1757 ) -> bytes:
1758 """Create a version 3 index file for this data file.
1760 Args:
1761 filename: Index filename.
1762 progress: Progress report function
1763 resolve_ext_ref: Function to resolve external references
1764 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256)
1765 Returns: Checksum of index file
1766 """
1767 entries = self.sorted_entries(
1768 progress=progress, resolve_ext_ref=resolve_ext_ref
1769 )
1770 with GitFile(filename, "wb") as f:
1771 return write_pack_index_v3(
1772 f, entries, self.calculate_checksum(), hash_algorithm
1773 )
1775 def create_index(
1776 self,
1777 filename: str,
1778 progress: Optional[Callable[..., None]] = None,
1779 version: int = 2,
1780 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1781 hash_algorithm: int = 1,
1782 ) -> bytes:
1783 """Create an index file for this data file.
1785 Args:
1786 filename: Index filename.
1787 progress: Progress report function
1788 version: Index version (1, 2, or 3)
1789 resolve_ext_ref: Function to resolve external references
1790 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256)
1791 Returns: Checksum of index file
1792 """
1793 if version == 1:
1794 return self.create_index_v1(
1795 filename, progress, resolve_ext_ref=resolve_ext_ref
1796 )
1797 elif version == 2:
1798 return self.create_index_v2(
1799 filename, progress, resolve_ext_ref=resolve_ext_ref
1800 )
1801 elif version == 3:
1802 return self.create_index_v3(
1803 filename,
1804 progress,
1805 resolve_ext_ref=resolve_ext_ref,
1806 hash_algorithm=hash_algorithm,
1807 )
1808 else:
1809 raise ValueError(f"unknown index format {version}")
1811 def get_stored_checksum(self) -> bytes:
1812 """Return the expected checksum stored in this pack."""
1813 self._file.seek(-20, SEEK_END)
1814 return self._file.read(20)
1816 def check(self) -> None:
1817 """Check the consistency of this pack."""
1818 actual = self.calculate_checksum()
1819 stored = self.get_stored_checksum()
1820 if actual != stored:
1821 raise ChecksumMismatch(stored, actual)
1823 def get_unpacked_object_at(
1824 self, offset: int, *, include_comp: bool = False
1825 ) -> UnpackedObject:
1826 """Given offset in the packfile return a UnpackedObject."""
1827 assert offset >= self._header_size
1828 self._file.seek(offset)
1829 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp)
1830 unpacked.offset = offset
1831 return unpacked
1833 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]:
1834 """Given an offset in to the packfile return the object that is there.
1836 Using the associated index the location of an object can be looked up,
1837 and then the packfile can be asked directly for that object using this
1838 function.
1839 """
1840 try:
1841 return self._offset_cache[offset]
1842 except KeyError:
1843 pass
1844 unpacked = self.get_unpacked_object_at(offset, include_comp=False)
1845 return (unpacked.pack_type_num, unpacked._obj())
1848T = TypeVar("T")
1851class DeltaChainIterator(Generic[T]):
1852 """Abstract iterator over pack data based on delta chains.
1854 Each object in the pack is guaranteed to be inflated exactly once,
1855 regardless of how many objects reference it as a delta base. As a result,
1856 memory usage is proportional to the length of the longest delta chain.
1858 Subclasses can override _result to define the result type of the iterator.
1859 By default, results are UnpackedObjects with the following members set:
1861 * offset
1862 * obj_type_num
1863 * obj_chunks
1864 * pack_type_num
1865 * delta_base (for delta types)
1866 * comp_chunks (if _include_comp is True)
1867 * decomp_chunks
1868 * decomp_len
1869 * crc32 (if _compute_crc32 is True)
1870 """
1872 _compute_crc32 = False
1873 _include_comp = False
1875 def __init__(
1876 self,
1877 file_obj: Optional[IO[bytes]],
1878 *,
1879 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1880 ) -> None:
1881 """Initialize DeltaChainIterator.
1883 Args:
1884 file_obj: File object to read pack data from
1885 resolve_ext_ref: Optional function to resolve external references
1886 """
1887 self._file = file_obj
1888 self._resolve_ext_ref = resolve_ext_ref
1889 self._pending_ofs: dict[int, list[int]] = defaultdict(list)
1890 self._pending_ref: dict[bytes, list[int]] = defaultdict(list)
1891 self._full_ofs: list[tuple[int, int]] = []
1892 self._ext_refs: list[bytes] = []
1894 @classmethod
1895 def for_pack_data(
1896 cls, pack_data: PackData, resolve_ext_ref: Optional[ResolveExtRefFn] = None
1897 ) -> "DeltaChainIterator[T]":
1898 """Create a DeltaChainIterator from pack data.
1900 Args:
1901 pack_data: PackData object to iterate
1902 resolve_ext_ref: Optional function to resolve external refs
1904 Returns:
1905 DeltaChainIterator instance
1906 """
1907 walker = cls(None, resolve_ext_ref=resolve_ext_ref)
1908 walker.set_pack_data(pack_data)
1909 for unpacked in pack_data.iter_unpacked(include_comp=False):
1910 walker.record(unpacked)
1911 return walker
1913 @classmethod
1914 def for_pack_subset(
1915 cls,
1916 pack: "Pack",
1917 shas: Iterable[bytes],
1918 *,
1919 allow_missing: bool = False,
1920 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
1921 ) -> "DeltaChainIterator[T]":
1922 """Create a DeltaChainIterator for a subset of objects.
1924 Args:
1925 pack: Pack object containing the data
1926 shas: Iterable of object SHAs to include
1927 allow_missing: If True, skip missing objects
1928 resolve_ext_ref: Optional function to resolve external refs
1930 Returns:
1931 DeltaChainIterator instance
1932 """
1933 walker = cls(None, resolve_ext_ref=resolve_ext_ref)
1934 walker.set_pack_data(pack.data)
1935 todo = set()
1936 for sha in shas:
1937 assert isinstance(sha, bytes)
1938 try:
1939 off = pack.index.object_offset(sha)
1940 except KeyError:
1941 if not allow_missing:
1942 raise
1943 else:
1944 todo.add(off)
1945 done = set()
1946 while todo:
1947 off = todo.pop()
1948 unpacked = pack.data.get_unpacked_object_at(off)
1949 walker.record(unpacked)
1950 done.add(off)
1951 base_ofs = None
1952 if unpacked.pack_type_num == OFS_DELTA:
1953 assert unpacked.offset is not None
1954 assert unpacked.delta_base is not None
1955 assert isinstance(unpacked.delta_base, int)
1956 base_ofs = unpacked.offset - unpacked.delta_base
1957 elif unpacked.pack_type_num == REF_DELTA:
1958 with suppress(KeyError):
1959 assert isinstance(unpacked.delta_base, bytes)
1960 base_ofs = pack.index.object_index(unpacked.delta_base)
1961 if base_ofs is not None and base_ofs not in done:
1962 todo.add(base_ofs)
1963 return walker
1965 def record(self, unpacked: UnpackedObject) -> None:
1966 """Record an unpacked object for later processing.
1968 Args:
1969 unpacked: UnpackedObject to record
1970 """
1971 type_num = unpacked.pack_type_num
1972 offset = unpacked.offset
1973 assert offset is not None
1974 if type_num == OFS_DELTA:
1975 assert unpacked.delta_base is not None
1976 assert isinstance(unpacked.delta_base, int)
1977 base_offset = offset - unpacked.delta_base
1978 self._pending_ofs[base_offset].append(offset)
1979 elif type_num == REF_DELTA:
1980 assert isinstance(unpacked.delta_base, bytes)
1981 self._pending_ref[unpacked.delta_base].append(offset)
1982 else:
1983 self._full_ofs.append((offset, type_num))
1985 def set_pack_data(self, pack_data: PackData) -> None:
1986 """Set the pack data for iteration.
1988 Args:
1989 pack_data: PackData object to use
1990 """
1991 self._file = pack_data._file
1993 def _walk_all_chains(self) -> Iterator[T]:
1994 for offset, type_num in self._full_ofs:
1995 yield from self._follow_chain(offset, type_num, None)
1996 yield from self._walk_ref_chains()
1997 assert not self._pending_ofs, repr(self._pending_ofs)
1999 def _ensure_no_pending(self) -> None:
2000 if self._pending_ref:
2001 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref])
2003 def _walk_ref_chains(self) -> Iterator[T]:
2004 if not self._resolve_ext_ref:
2005 self._ensure_no_pending()
2006 return
2008 for base_sha, pending in sorted(self._pending_ref.items()):
2009 if base_sha not in self._pending_ref:
2010 continue
2011 try:
2012 type_num, chunks = self._resolve_ext_ref(base_sha)
2013 except KeyError:
2014 # Not an external ref, but may depend on one. Either it will
2015 # get popped via a _follow_chain call, or we will raise an
2016 # error below.
2017 continue
2018 self._ext_refs.append(base_sha)
2019 self._pending_ref.pop(base_sha)
2020 for new_offset in pending:
2021 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type]
2023 self._ensure_no_pending()
2025 def _result(self, unpacked: UnpackedObject) -> T:
2026 raise NotImplementedError
2028 def _resolve_object(
2029 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]]
2030 ) -> UnpackedObject:
2031 assert self._file is not None
2032 self._file.seek(offset)
2033 unpacked, _ = unpack_object(
2034 self._file.read,
2035 include_comp=self._include_comp,
2036 compute_crc32=self._compute_crc32,
2037 )
2038 unpacked.offset = offset
2039 if base_chunks is None:
2040 assert unpacked.pack_type_num == obj_type_num
2041 else:
2042 assert unpacked.pack_type_num in DELTA_TYPES
2043 unpacked.obj_type_num = obj_type_num
2044 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks)
2045 return unpacked
2047 def _follow_chain(
2048 self, offset: int, obj_type_num: int, base_chunks: Optional[list[bytes]]
2049 ) -> Iterator[T]:
2050 # Unlike PackData.get_object_at, there is no need to cache offsets as
2051 # this approach by design inflates each object exactly once.
2052 todo = [(offset, obj_type_num, base_chunks)]
2053 while todo:
2054 (offset, obj_type_num, base_chunks) = todo.pop()
2055 unpacked = self._resolve_object(offset, obj_type_num, base_chunks)
2056 yield self._result(unpacked)
2058 assert unpacked.offset is not None
2059 unblocked = chain(
2060 self._pending_ofs.pop(unpacked.offset, []),
2061 self._pending_ref.pop(unpacked.sha(), []),
2062 )
2063 todo.extend(
2064 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore
2065 for new_offset in unblocked
2066 )
2068 def __iter__(self) -> Iterator[T]:
2069 """Iterate over objects in the pack."""
2070 return self._walk_all_chains()
2072 def ext_refs(self) -> list[bytes]:
2073 """Return external references."""
2074 return self._ext_refs
2077class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]):
2078 """Delta chain iterator that yield unpacked objects."""
2080 def _result(self, unpacked: UnpackedObject) -> UnpackedObject:
2081 """Return the unpacked object.
2083 Args:
2084 unpacked: The unpacked object
2086 Returns:
2087 The unpacked object unchanged
2088 """
2089 return unpacked
2092class PackIndexer(DeltaChainIterator[PackIndexEntry]):
2093 """Delta chain iterator that yields index entries."""
2095 _compute_crc32 = True
2097 def _result(self, unpacked: UnpackedObject) -> tuple[bytes, int, Optional[int]]:
2098 """Convert unpacked object to pack index entry.
2100 Args:
2101 unpacked: The unpacked object
2103 Returns:
2104 Tuple of (sha, offset, crc32) for index entry
2105 """
2106 assert unpacked.offset is not None
2107 return unpacked.sha(), unpacked.offset, unpacked.crc32
2110class PackInflater(DeltaChainIterator[ShaFile]):
2111 """Delta chain iterator that yields ShaFile objects."""
2113 def _result(self, unpacked: UnpackedObject) -> ShaFile:
2114 """Convert unpacked object to ShaFile.
2116 Args:
2117 unpacked: The unpacked object
2119 Returns:
2120 ShaFile object from the unpacked data
2121 """
2122 return unpacked.sha_file()
2125class SHA1Reader(BinaryIO):
2126 """Wrapper for file-like object that remembers the SHA1 of its data."""
2128 def __init__(self, f: IO[bytes]) -> None:
2129 """Initialize SHA1Reader.
2131 Args:
2132 f: File-like object to wrap
2133 """
2134 self.f = f
2135 self.sha1 = sha1(b"")
2137 def read(self, size: int = -1) -> bytes:
2138 """Read bytes and update SHA1.
2140 Args:
2141 size: Number of bytes to read, -1 for all
2143 Returns:
2144 Bytes read from file
2145 """
2146 data = self.f.read(size)
2147 self.sha1.update(data)
2148 return data
2150 def check_sha(self, allow_empty: bool = False) -> None:
2151 """Check if the SHA1 matches the expected value.
2153 Args:
2154 allow_empty: Allow empty SHA1 hash
2156 Raises:
2157 ChecksumMismatch: If SHA1 doesn't match
2158 """
2159 stored = self.f.read(20)
2160 # If git option index.skipHash is set the index will be empty
2161 if stored != self.sha1.digest() and (
2162 not allow_empty
2163 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000"
2164 ):
2165 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored))
2167 def close(self) -> None:
2168 """Close the underlying file."""
2169 return self.f.close()
2171 def tell(self) -> int:
2172 """Return current file position."""
2173 return self.f.tell()
2175 # BinaryIO abstract methods
2176 def readable(self) -> bool:
2177 """Check if file is readable."""
2178 return True
2180 def writable(self) -> bool:
2181 """Check if file is writable."""
2182 return False
2184 def seekable(self) -> bool:
2185 """Check if file is seekable."""
2186 return getattr(self.f, "seekable", lambda: False)()
2188 def seek(self, offset: int, whence: int = 0) -> int:
2189 """Seek to position in file.
2191 Args:
2192 offset: Position offset
2193 whence: Reference point (0=start, 1=current, 2=end)
2195 Returns:
2196 New file position
2197 """
2198 return self.f.seek(offset, whence)
2200 def flush(self) -> None:
2201 """Flush the file buffer."""
2202 if hasattr(self.f, "flush"):
2203 self.f.flush()
2205 def readline(self, size: int = -1) -> bytes:
2206 """Read a line from the file.
2208 Args:
2209 size: Maximum bytes to read
2211 Returns:
2212 Line read from file
2213 """
2214 return self.f.readline(size)
2216 def readlines(self, hint: int = -1) -> list[bytes]:
2217 """Read all lines from the file.
2219 Args:
2220 hint: Approximate number of bytes to read
2222 Returns:
2223 List of lines
2224 """
2225 return self.f.readlines(hint)
2227 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override]
2228 """Write multiple lines to the file (not supported)."""
2229 raise UnsupportedOperation("writelines")
2231 def write(self, data: bytes, /) -> int: # type: ignore[override]
2232 """Write data to the file (not supported)."""
2233 raise UnsupportedOperation("write")
2235 def __enter__(self) -> "SHA1Reader":
2236 """Enter context manager."""
2237 return self
2239 def __exit__(
2240 self,
2241 type: Optional[type],
2242 value: Optional[BaseException],
2243 traceback: Optional[TracebackType],
2244 ) -> None:
2245 """Exit context manager and close file."""
2246 self.close()
2248 def __iter__(self) -> "SHA1Reader":
2249 """Return iterator for reading file lines."""
2250 return self
2252 def __next__(self) -> bytes:
2253 """Get next line from file.
2255 Returns:
2256 Next line
2258 Raises:
2259 StopIteration: When no more lines
2260 """
2261 line = self.readline()
2262 if not line:
2263 raise StopIteration
2264 return line
2266 def fileno(self) -> int:
2267 """Return file descriptor number."""
2268 return self.f.fileno()
2270 def isatty(self) -> bool:
2271 """Check if file is a terminal."""
2272 return getattr(self.f, "isatty", lambda: False)()
2274 def truncate(self, size: Optional[int] = None) -> int:
2275 """Not supported for read-only file.
2277 Raises:
2278 UnsupportedOperation: Always raised
2279 """
2280 raise UnsupportedOperation("truncate")
2283class SHA1Writer(BinaryIO):
2284 """Wrapper for file-like object that remembers the SHA1 of its data."""
2286 def __init__(self, f: Union[BinaryIO, IO[bytes]]) -> None:
2287 """Initialize SHA1Writer.
2289 Args:
2290 f: File-like object to wrap
2291 """
2292 self.f = f
2293 self.length = 0
2294 self.sha1 = sha1(b"")
2295 self.digest: Optional[bytes] = None
2297 def write(self, data: Union[bytes, bytearray, memoryview], /) -> int: # type: ignore[override]
2298 """Write data and update SHA1.
2300 Args:
2301 data: Data to write
2303 Returns:
2304 Number of bytes written
2305 """
2306 self.sha1.update(data)
2307 written = self.f.write(data)
2308 self.length += written
2309 return written
2311 def write_sha(self) -> bytes:
2312 """Write the SHA1 digest to the file.
2314 Returns:
2315 The SHA1 digest bytes
2316 """
2317 sha = self.sha1.digest()
2318 assert len(sha) == 20
2319 self.f.write(sha)
2320 self.length += len(sha)
2321 return sha
2323 def close(self) -> None:
2324 """Close the pack file and finalize the SHA."""
2325 self.digest = self.write_sha()
2326 self.f.close()
2328 def offset(self) -> int:
2329 """Get the total number of bytes written.
2331 Returns:
2332 Total bytes written
2333 """
2334 return self.length
2336 def tell(self) -> int:
2337 """Return current file position."""
2338 return self.f.tell()
2340 # BinaryIO abstract methods
2341 def readable(self) -> bool:
2342 """Check if file is readable."""
2343 return False
2345 def writable(self) -> bool:
2346 """Check if file is writable."""
2347 return True
2349 def seekable(self) -> bool:
2350 """Check if file is seekable."""
2351 return getattr(self.f, "seekable", lambda: False)()
2353 def seek(self, offset: int, whence: int = 0) -> int:
2354 """Seek to position in file.
2356 Args:
2357 offset: Position offset
2358 whence: Reference point (0=start, 1=current, 2=end)
2360 Returns:
2361 New file position
2362 """
2363 return self.f.seek(offset, whence)
2365 def flush(self) -> None:
2366 """Flush the file buffer."""
2367 if hasattr(self.f, "flush"):
2368 self.f.flush()
2370 def readline(self, size: int = -1) -> bytes:
2371 """Not supported for write-only file.
2373 Raises:
2374 UnsupportedOperation: Always raised
2375 """
2376 raise UnsupportedOperation("readline")
2378 def readlines(self, hint: int = -1) -> list[bytes]:
2379 """Not supported for write-only file.
2381 Raises:
2382 UnsupportedOperation: Always raised
2383 """
2384 raise UnsupportedOperation("readlines")
2386 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override]
2387 """Write multiple lines to the file.
2389 Args:
2390 lines: Iterable of lines to write
2391 """
2392 for line in lines:
2393 self.write(line)
2395 def read(self, size: int = -1) -> bytes:
2396 """Not supported for write-only file.
2398 Raises:
2399 UnsupportedOperation: Always raised
2400 """
2401 raise UnsupportedOperation("read")
2403 def __enter__(self) -> "SHA1Writer":
2404 """Enter context manager."""
2405 return self
2407 def __exit__(
2408 self,
2409 type: Optional[type],
2410 value: Optional[BaseException],
2411 traceback: Optional[TracebackType],
2412 ) -> None:
2413 """Exit context manager and close file."""
2414 self.close()
2416 def __iter__(self) -> "SHA1Writer":
2417 """Return iterator."""
2418 return self
2420 def __next__(self) -> bytes:
2421 """Not supported for write-only file.
2423 Raises:
2424 UnsupportedOperation: Always raised
2425 """
2426 raise UnsupportedOperation("__next__")
2428 def fileno(self) -> int:
2429 """Return file descriptor number."""
2430 return self.f.fileno()
2432 def isatty(self) -> bool:
2433 """Check if file is a terminal."""
2434 return getattr(self.f, "isatty", lambda: False)()
2436 def truncate(self, size: Optional[int] = None) -> int:
2437 """Not supported for write-only file.
2439 Raises:
2440 UnsupportedOperation: Always raised
2441 """
2442 raise UnsupportedOperation("truncate")
2445def pack_object_header(
2446 type_num: int, delta_base: Optional[Union[bytes, int]], size: int
2447) -> bytearray:
2448 """Create a pack object header for the given object info.
2450 Args:
2451 type_num: Numeric type of the object.
2452 delta_base: Delta base offset or ref, or None for whole objects.
2453 size: Uncompressed object size.
2454 Returns: A header for a packed object.
2455 """
2456 header = []
2457 c = (type_num << 4) | (size & 15)
2458 size >>= 4
2459 while size:
2460 header.append(c | 0x80)
2461 c = size & 0x7F
2462 size >>= 7
2463 header.append(c)
2464 if type_num == OFS_DELTA:
2465 assert isinstance(delta_base, int)
2466 ret = [delta_base & 0x7F]
2467 delta_base >>= 7
2468 while delta_base:
2469 delta_base -= 1
2470 ret.insert(0, 0x80 | (delta_base & 0x7F))
2471 delta_base >>= 7
2472 header.extend(ret)
2473 elif type_num == REF_DELTA:
2474 assert isinstance(delta_base, bytes)
2475 assert len(delta_base) == 20
2476 header += delta_base
2477 return bytearray(header)
2480def pack_object_chunks(
2481 type: int,
2482 object: Union[list[bytes], tuple[Union[bytes, int], list[bytes]]],
2483 compression_level: int = -1,
2484) -> Iterator[bytes]:
2485 """Generate chunks for a pack object.
2487 Args:
2488 type: Numeric type of the object
2489 object: Object to write
2490 compression_level: the zlib compression level
2491 Returns: Chunks
2492 """
2493 if type in DELTA_TYPES:
2494 if isinstance(object, tuple):
2495 delta_base, object = object
2496 else:
2497 raise TypeError("Delta types require a tuple of (delta_base, object)")
2498 else:
2499 delta_base = None
2501 # Convert object to list of bytes chunks
2502 if isinstance(object, bytes):
2503 chunks = [object]
2504 elif isinstance(object, list):
2505 chunks = object
2506 elif isinstance(object, ShaFile):
2507 chunks = object.as_raw_chunks()
2508 else:
2509 # Shouldn't reach here with proper typing
2510 raise TypeError(f"Unexpected object type: {object.__class__.__name__}")
2512 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks))))
2513 compressor = zlib.compressobj(level=compression_level)
2514 for data in chunks:
2515 yield compressor.compress(data)
2516 yield compressor.flush()
2519def write_pack_object(
2520 write: Callable[[bytes], int],
2521 type: int,
2522 object: Union[list[bytes], tuple[Union[bytes, int], list[bytes]]],
2523 sha: Optional["HashObject"] = None,
2524 compression_level: int = -1,
2525) -> int:
2526 """Write pack object to a file.
2528 Args:
2529 write: Write function to use
2530 type: Numeric type of the object
2531 object: Object to write
2532 sha: Optional SHA-1 hasher to update
2533 compression_level: the zlib compression level
2534 Returns: CRC32 checksum of the written object
2535 """
2536 crc32 = 0
2537 for chunk in pack_object_chunks(type, object, compression_level=compression_level):
2538 write(chunk)
2539 if sha is not None:
2540 sha.update(chunk)
2541 crc32 = binascii.crc32(chunk, crc32)
2542 return crc32 & 0xFFFFFFFF
2545def write_pack(
2546 filename: str,
2547 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]],
2548 *,
2549 deltify: Optional[bool] = None,
2550 delta_window_size: Optional[int] = None,
2551 compression_level: int = -1,
2552) -> tuple[bytes, bytes]:
2553 """Write a new pack data file.
2555 Args:
2556 filename: Path to the new pack file (without .pack extension)
2557 objects: Objects to write to the pack
2558 delta_window_size: Delta window size
2559 deltify: Whether to deltify pack objects
2560 compression_level: the zlib compression level
2561 Returns: Tuple with checksum of pack file and index file
2562 """
2563 with GitFile(filename + ".pack", "wb") as f:
2564 entries, data_sum = write_pack_objects(
2565 f,
2566 objects,
2567 delta_window_size=delta_window_size,
2568 deltify=deltify,
2569 compression_level=compression_level,
2570 )
2571 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()])
2572 with GitFile(filename + ".idx", "wb") as f:
2573 idx_sha = write_pack_index(f, entries_list, data_sum)
2574 return data_sum, idx_sha
2577def pack_header_chunks(num_objects: int) -> Iterator[bytes]:
2578 """Yield chunks for a pack header."""
2579 yield b"PACK" # Pack header
2580 yield struct.pack(b">L", 2) # Pack version
2581 yield struct.pack(b">L", num_objects) # Number of objects in pack
2584def write_pack_header(
2585 write: Union[Callable[[bytes], int], IO[bytes]], num_objects: int
2586) -> None:
2587 """Write a pack header for the given number of objects."""
2588 write_fn: Callable[[bytes], int]
2589 if hasattr(write, "write"):
2590 write_fn = write.write
2591 warnings.warn(
2592 "write_pack_header() now takes a write rather than file argument",
2593 DeprecationWarning,
2594 stacklevel=2,
2595 )
2596 else:
2597 write_fn = write
2598 for chunk in pack_header_chunks(num_objects):
2599 write_fn(chunk)
2602def find_reusable_deltas(
2603 container: PackedObjectContainer,
2604 object_ids: Set[bytes],
2605 *,
2606 other_haves: Optional[Set[bytes]] = None,
2607 progress: Optional[Callable[..., None]] = None,
2608) -> Iterator[UnpackedObject]:
2609 """Find deltas in a pack that can be reused.
2611 Args:
2612 container: Pack container to search for deltas
2613 object_ids: Set of object IDs to find deltas for
2614 other_haves: Set of other object IDs we have
2615 progress: Optional progress reporting callback
2617 Returns:
2618 Iterator of UnpackedObject entries that can be reused
2619 """
2620 if other_haves is None:
2621 other_haves = set()
2622 reused = 0
2623 for i, unpacked in enumerate(
2624 container.iter_unpacked_subset(
2625 object_ids, allow_missing=True, convert_ofs_delta=True
2626 )
2627 ):
2628 if progress is not None and i % 1000 == 0:
2629 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode())
2630 if unpacked.pack_type_num == REF_DELTA:
2631 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore
2632 if hexsha in object_ids or hexsha in other_haves:
2633 yield unpacked
2634 reused += 1
2635 if progress is not None:
2636 progress((f"found {reused} deltas to reuse\n").encode())
2639def deltify_pack_objects(
2640 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[bytes]]]],
2641 *,
2642 window_size: Optional[int] = None,
2643 progress: Optional[Callable[..., None]] = None,
2644) -> Iterator[UnpackedObject]:
2645 """Generate deltas for pack objects.
2647 Args:
2648 objects: An iterable of (object, path) tuples to deltify.
2649 window_size: Window size; None for default
2650 progress: Optional progress reporting callback
2651 Returns: Iterator over type_num, object id, delta_base, content
2652 delta_base is None for full text entries
2653 """
2655 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, Optional[bytes]]]]:
2656 for e in objects:
2657 if isinstance(e, ShaFile):
2658 yield (e, (e.type_num, None))
2659 else:
2660 yield (e[0], (e[0].type_num, e[1]))
2662 sorted_objs = sort_objects_for_delta(objects_with_hints())
2663 yield from deltas_from_sorted_objects(
2664 sorted_objs,
2665 window_size=window_size,
2666 progress=progress,
2667 )
2670def sort_objects_for_delta(
2671 objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[PackHint]]]],
2672) -> Iterator[tuple[ShaFile, Optional[bytes]]]:
2673 """Sort objects for optimal delta compression.
2675 Args:
2676 objects: Iterator of objects or (object, hint) tuples
2678 Returns:
2679 Iterator of sorted (ShaFile, path) tuples
2680 """
2681 magic = []
2682 for entry in objects:
2683 if isinstance(entry, tuple):
2684 obj, hint = entry
2685 if hint is None:
2686 type_num = None
2687 path = None
2688 else:
2689 (type_num, path) = hint
2690 else:
2691 obj = entry
2692 type_num = None
2693 path = None
2694 magic.append((type_num, path, -obj.raw_length(), obj))
2695 # Build a list of objects ordered by the magic Linus heuristic
2696 # This helps us find good objects to diff against us
2697 magic.sort()
2698 return ((x[3], x[1]) for x in magic)
2701def deltas_from_sorted_objects(
2702 objects: Iterator[tuple[ShaFile, Optional[bytes]]],
2703 window_size: Optional[int] = None,
2704 progress: Optional[Callable[..., None]] = None,
2705) -> Iterator[UnpackedObject]:
2706 """Create deltas from sorted objects.
2708 Args:
2709 objects: Iterator of sorted objects to deltify
2710 window_size: Delta window size; None for default
2711 progress: Optional progress reporting callback
2713 Returns:
2714 Iterator of UnpackedObject entries
2715 """
2716 # TODO(jelmer): Use threads
2717 if window_size is None:
2718 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE
2720 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque()
2721 for i, (o, path) in enumerate(objects):
2722 if progress is not None and i % 1000 == 0:
2723 progress((f"generating deltas: {i}\r").encode())
2724 raw = o.as_raw_chunks()
2725 winner = raw
2726 winner_len = sum(map(len, winner))
2727 winner_base = None
2728 for base_id, base_type_num, base in possible_bases:
2729 if base_type_num != o.type_num:
2730 continue
2731 delta_len = 0
2732 delta = []
2733 for chunk in create_delta(b"".join(base), b"".join(raw)):
2734 delta_len += len(chunk)
2735 if delta_len >= winner_len:
2736 break
2737 delta.append(chunk)
2738 else:
2739 winner_base = base_id
2740 winner = delta
2741 winner_len = sum(map(len, winner))
2742 yield UnpackedObject(
2743 o.type_num,
2744 sha=o.sha().digest(),
2745 delta_base=winner_base,
2746 decomp_len=winner_len,
2747 decomp_chunks=winner,
2748 )
2749 possible_bases.appendleft((o.sha().digest(), o.type_num, raw))
2750 while len(possible_bases) > window_size:
2751 possible_bases.pop()
2754def pack_objects_to_data(
2755 objects: Union[
2756 Sequence[ShaFile],
2757 Sequence[tuple[ShaFile, Optional[bytes]]],
2758 Sequence[tuple[ShaFile, Optional[PackHint]]],
2759 ],
2760 *,
2761 deltify: Optional[bool] = None,
2762 delta_window_size: Optional[int] = None,
2763 ofs_delta: bool = True,
2764 progress: Optional[Callable[..., None]] = None,
2765) -> tuple[int, Iterator[UnpackedObject]]:
2766 """Create pack data from objects.
2768 Args:
2769 objects: Pack objects
2770 deltify: Whether to deltify pack objects
2771 delta_window_size: Delta window size
2772 ofs_delta: Whether to use offset deltas
2773 progress: Optional progress reporting callback
2774 Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
2775 """
2776 # TODO(jelmer): support deltaifying
2777 count = len(objects)
2778 if deltify is None:
2779 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
2780 # slow at the moment.
2781 deltify = False
2782 if deltify:
2783 return (
2784 count,
2785 deltify_pack_objects(
2786 iter(objects), # type: ignore
2787 window_size=delta_window_size,
2788 progress=progress,
2789 ),
2790 )
2791 else:
2793 def iter_without_path() -> Iterator[UnpackedObject]:
2794 for o in objects:
2795 if isinstance(o, tuple):
2796 yield full_unpacked_object(o[0])
2797 else:
2798 yield full_unpacked_object(o)
2800 return (count, iter_without_path())
2803def generate_unpacked_objects(
2804 container: PackedObjectContainer,
2805 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]],
2806 delta_window_size: Optional[int] = None,
2807 deltify: Optional[bool] = None,
2808 reuse_deltas: bool = True,
2809 ofs_delta: bool = True,
2810 other_haves: Optional[set[bytes]] = None,
2811 progress: Optional[Callable[..., None]] = None,
2812) -> Iterator[UnpackedObject]:
2813 """Create pack data from objects.
2815 Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
2816 """
2817 todo = dict(object_ids)
2818 if reuse_deltas:
2819 for unpack in find_reusable_deltas(
2820 container, set(todo), other_haves=other_haves, progress=progress
2821 ):
2822 del todo[sha_to_hex(unpack.sha())]
2823 yield unpack
2824 if deltify is None:
2825 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
2826 # slow at the moment.
2827 deltify = False
2828 if deltify:
2829 objects_to_delta = container.iterobjects_subset(
2830 todo.keys(), allow_missing=False
2831 )
2832 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta)
2833 yield from deltas_from_sorted_objects(
2834 sorted_objs,
2835 window_size=delta_window_size,
2836 progress=progress,
2837 )
2838 else:
2839 for oid in todo:
2840 yield full_unpacked_object(container[oid])
2843def full_unpacked_object(o: ShaFile) -> UnpackedObject:
2844 """Create an UnpackedObject from a ShaFile.
2846 Args:
2847 o: ShaFile object to convert
2849 Returns:
2850 UnpackedObject with full object data
2851 """
2852 return UnpackedObject(
2853 o.type_num,
2854 delta_base=None,
2855 crc32=None,
2856 decomp_chunks=o.as_raw_chunks(),
2857 sha=o.sha().digest(),
2858 )
2861def write_pack_from_container(
2862 write: Union[
2863 Callable[[bytes], None],
2864 Callable[[Union[bytes, bytearray, memoryview]], int],
2865 IO[bytes],
2866 ],
2867 container: PackedObjectContainer,
2868 object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]],
2869 delta_window_size: Optional[int] = None,
2870 deltify: Optional[bool] = None,
2871 reuse_deltas: bool = True,
2872 compression_level: int = -1,
2873 other_haves: Optional[set[bytes]] = None,
2874) -> tuple[dict[bytes, tuple[int, int]], bytes]:
2875 """Write a new pack data file.
2877 Args:
2878 write: write function to use
2879 container: PackedObjectContainer
2880 object_ids: Sequence of (object_id, hint) tuples to write
2881 delta_window_size: Sliding window size for searching for deltas;
2882 Set to None for default window size.
2883 deltify: Whether to deltify objects
2884 reuse_deltas: Whether to reuse existing deltas
2885 compression_level: the zlib compression level to use
2886 other_haves: Set of additional object IDs the receiver has
2887 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
2888 """
2889 pack_contents_count = len(object_ids)
2890 pack_contents = generate_unpacked_objects(
2891 container,
2892 object_ids,
2893 delta_window_size=delta_window_size,
2894 deltify=deltify,
2895 reuse_deltas=reuse_deltas,
2896 other_haves=other_haves,
2897 )
2899 return write_pack_data(
2900 write,
2901 pack_contents,
2902 num_records=pack_contents_count,
2903 compression_level=compression_level,
2904 )
2907def write_pack_objects(
2908 write: Union[Callable[[bytes], None], IO[bytes]],
2909 objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]],
2910 *,
2911 delta_window_size: Optional[int] = None,
2912 deltify: Optional[bool] = None,
2913 compression_level: int = -1,
2914) -> tuple[dict[bytes, tuple[int, int]], bytes]:
2915 """Write a new pack data file.
2917 Args:
2918 write: write function to use
2919 objects: Sequence of (object, path) tuples to write
2920 delta_window_size: Sliding window size for searching for deltas;
2921 Set to None for default window size.
2922 deltify: Whether to deltify objects
2923 compression_level: the zlib compression level to use
2924 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
2925 """
2926 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify)
2928 return write_pack_data(
2929 write,
2930 pack_contents,
2931 num_records=pack_contents_count,
2932 compression_level=compression_level,
2933 )
2936class PackChunkGenerator:
2937 """Generator for pack data chunks."""
2939 def __init__(
2940 self,
2941 num_records: Optional[int] = None,
2942 records: Optional[Iterator[UnpackedObject]] = None,
2943 progress: Optional[Callable[..., None]] = None,
2944 compression_level: int = -1,
2945 reuse_compressed: bool = True,
2946 ) -> None:
2947 """Initialize PackChunkGenerator.
2949 Args:
2950 num_records: Expected number of records
2951 records: Iterator of pack records
2952 progress: Optional progress callback
2953 compression_level: Compression level (-1 for default)
2954 reuse_compressed: Whether to reuse compressed chunks
2955 """
2956 self.cs = sha1(b"")
2957 self.entries: dict[bytes, tuple[int, int]] = {}
2958 if records is None:
2959 records = iter([]) # Empty iterator if None
2960 self._it = self._pack_data_chunks(
2961 records=records,
2962 num_records=num_records,
2963 progress=progress,
2964 compression_level=compression_level,
2965 reuse_compressed=reuse_compressed,
2966 )
2968 def sha1digest(self) -> bytes:
2969 """Return the SHA1 digest of the pack data."""
2970 return self.cs.digest()
2972 def __iter__(self) -> Iterator[bytes]:
2973 """Iterate over pack data chunks."""
2974 return self._it
2976 def _pack_data_chunks(
2977 self,
2978 records: Iterator[UnpackedObject],
2979 *,
2980 num_records: Optional[int] = None,
2981 progress: Optional[Callable[..., None]] = None,
2982 compression_level: int = -1,
2983 reuse_compressed: bool = True,
2984 ) -> Iterator[bytes]:
2985 """Iterate pack data file chunks.
2987 Args:
2988 records: Iterator over UnpackedObject
2989 num_records: Number of records (defaults to len(records) if not specified)
2990 progress: Function to report progress to
2991 compression_level: the zlib compression level
2992 reuse_compressed: Whether to reuse compressed chunks
2993 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
2994 """
2995 # Write the pack
2996 if num_records is None:
2997 num_records = len(records) # type: ignore
2998 offset = 0
2999 for chunk in pack_header_chunks(num_records):
3000 yield chunk
3001 self.cs.update(chunk)
3002 offset += len(chunk)
3003 actual_num_records = 0
3004 for i, unpacked in enumerate(records):
3005 type_num = unpacked.pack_type_num
3006 if progress is not None and i % 1000 == 0:
3007 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii"))
3008 raw: Union[list[bytes], tuple[int, list[bytes]], tuple[bytes, list[bytes]]]
3009 if unpacked.delta_base is not None:
3010 assert isinstance(unpacked.delta_base, bytes), (
3011 f"Expected bytes, got {type(unpacked.delta_base)}"
3012 )
3013 try:
3014 base_offset, _base_crc32 = self.entries[unpacked.delta_base]
3015 except KeyError:
3016 type_num = REF_DELTA
3017 assert isinstance(unpacked.delta_base, bytes)
3018 raw = (unpacked.delta_base, unpacked.decomp_chunks)
3019 else:
3020 type_num = OFS_DELTA
3021 raw = (offset - base_offset, unpacked.decomp_chunks)
3022 else:
3023 raw = unpacked.decomp_chunks
3024 chunks: Union[list[bytes], Iterator[bytes]]
3025 if unpacked.comp_chunks is not None and reuse_compressed:
3026 chunks = unpacked.comp_chunks
3027 else:
3028 chunks = pack_object_chunks(
3029 type_num, raw, compression_level=compression_level
3030 )
3031 crc32 = 0
3032 object_size = 0
3033 for chunk in chunks:
3034 yield chunk
3035 crc32 = binascii.crc32(chunk, crc32)
3036 self.cs.update(chunk)
3037 object_size += len(chunk)
3038 actual_num_records += 1
3039 self.entries[unpacked.sha()] = (offset, crc32)
3040 offset += object_size
3041 if actual_num_records != num_records:
3042 raise AssertionError(
3043 f"actual records written differs: {actual_num_records} != {num_records}"
3044 )
3046 yield self.cs.digest()
3049def write_pack_data(
3050 write: Union[
3051 Callable[[bytes], None],
3052 Callable[[Union[bytes, bytearray, memoryview]], int],
3053 IO[bytes],
3054 ],
3055 records: Iterator[UnpackedObject],
3056 *,
3057 num_records: Optional[int] = None,
3058 progress: Optional[Callable[..., None]] = None,
3059 compression_level: int = -1,
3060) -> tuple[dict[bytes, tuple[int, int]], bytes]:
3061 """Write a new pack data file.
3063 Args:
3064 write: Write function to use
3065 num_records: Number of records (defaults to len(records) if None)
3066 records: Iterator over type_num, object_id, delta_base, raw
3067 progress: Function to report progress to
3068 compression_level: the zlib compression level
3069 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
3070 """
3071 chunk_generator = PackChunkGenerator(
3072 num_records=num_records,
3073 records=records,
3074 progress=progress,
3075 compression_level=compression_level,
3076 )
3077 for chunk in chunk_generator:
3078 if callable(write):
3079 write(chunk)
3080 else:
3081 write.write(chunk)
3082 return chunk_generator.entries, chunk_generator.sha1digest()
3085def write_pack_index_v1(
3086 f: IO[bytes],
3087 entries: Iterable[tuple[bytes, int, Union[int, None]]],
3088 pack_checksum: bytes,
3089) -> bytes:
3090 """Write a new pack index file.
3092 Args:
3093 f: A file-like object to write to
3094 entries: List of tuples with object name (sha), offset_in_pack,
3095 and crc32_checksum.
3096 pack_checksum: Checksum of the pack file.
3097 Returns: The SHA of the written index file
3098 """
3099 f = SHA1Writer(f)
3100 fan_out_table: dict[int, int] = defaultdict(lambda: 0)
3101 for name, _offset, _entry_checksum in entries:
3102 fan_out_table[ord(name[:1])] += 1
3103 # Fan-out table
3104 for i in range(0x100):
3105 f.write(struct.pack(">L", fan_out_table[i]))
3106 fan_out_table[i + 1] += fan_out_table[i]
3107 for name, offset, _entry_checksum in entries:
3108 if not (offset <= 0xFFFFFFFF):
3109 raise TypeError("pack format 1 only supports offsets < 2Gb")
3110 f.write(struct.pack(">L20s", offset, name))
3111 assert len(pack_checksum) == 20
3112 f.write(pack_checksum)
3113 return f.write_sha()
3116def _delta_encode_size(size: int) -> bytes:
3117 ret = bytearray()
3118 c = size & 0x7F
3119 size >>= 7
3120 while size:
3121 ret.append(c | 0x80)
3122 c = size & 0x7F
3123 size >>= 7
3124 ret.append(c)
3125 return bytes(ret)
3128# The length of delta compression copy operations in version 2 packs is limited
3129# to 64K. To copy more, we use several copy operations. Version 3 packs allow
3130# 24-bit lengths in copy operations, but we always make version 2 packs.
3131_MAX_COPY_LEN = 0xFFFF
3134def _encode_copy_operation(start: int, length: int) -> bytes:
3135 scratch = bytearray([0x80])
3136 for i in range(4):
3137 if start & 0xFF << i * 8:
3138 scratch.append((start >> i * 8) & 0xFF)
3139 scratch[0] |= 1 << i
3140 for i in range(2):
3141 if length & 0xFF << i * 8:
3142 scratch.append((length >> i * 8) & 0xFF)
3143 scratch[0] |= 1 << (4 + i)
3144 return bytes(scratch)
3147def create_delta(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]:
3148 """Use python difflib to work out how to transform base_buf to target_buf.
3150 Args:
3151 base_buf: Base buffer
3152 target_buf: Target buffer
3153 """
3154 if isinstance(base_buf, list):
3155 base_buf = b"".join(base_buf)
3156 if isinstance(target_buf, list):
3157 target_buf = b"".join(target_buf)
3158 assert isinstance(base_buf, bytes)
3159 assert isinstance(target_buf, bytes)
3160 # write delta header
3161 yield _delta_encode_size(len(base_buf))
3162 yield _delta_encode_size(len(target_buf))
3163 # write out delta opcodes
3164 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf)
3165 for opcode, i1, i2, j1, j2 in seq.get_opcodes():
3166 # Git patch opcodes don't care about deletes!
3167 # if opcode == 'replace' or opcode == 'delete':
3168 # pass
3169 if opcode == "equal":
3170 # If they are equal, unpacker will use data from base_buf
3171 # Write out an opcode that says what range to use
3172 copy_start = i1
3173 copy_len = i2 - i1
3174 while copy_len > 0:
3175 to_copy = min(copy_len, _MAX_COPY_LEN)
3176 yield _encode_copy_operation(copy_start, to_copy)
3177 copy_start += to_copy
3178 copy_len -= to_copy
3179 if opcode == "replace" or opcode == "insert":
3180 # If we are replacing a range or adding one, then we just
3181 # output it to the stream (prefixed by its size)
3182 s = j2 - j1
3183 o = j1
3184 while s > 127:
3185 yield bytes([127])
3186 yield bytes(memoryview(target_buf)[o : o + 127])
3187 s -= 127
3188 o += 127
3189 yield bytes([s])
3190 yield bytes(memoryview(target_buf)[o : o + s])
3193def apply_delta(
3194 src_buf: Union[bytes, list[bytes]], delta: Union[bytes, list[bytes]]
3195) -> list[bytes]:
3196 """Based on the similar function in git's patch-delta.c.
3198 Args:
3199 src_buf: Source buffer
3200 delta: Delta instructions
3201 """
3202 if not isinstance(src_buf, bytes):
3203 src_buf = b"".join(src_buf)
3204 if not isinstance(delta, bytes):
3205 delta = b"".join(delta)
3206 out = []
3207 index = 0
3208 delta_length = len(delta)
3210 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]:
3211 size = 0
3212 i = 0
3213 while delta:
3214 cmd = ord(delta[index : index + 1])
3215 index += 1
3216 size |= (cmd & ~0x80) << i
3217 i += 7
3218 if not cmd & 0x80:
3219 break
3220 return size, index
3222 src_size, index = get_delta_header_size(delta, index)
3223 dest_size, index = get_delta_header_size(delta, index)
3224 if src_size != len(src_buf):
3225 raise ApplyDeltaError(
3226 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}"
3227 )
3228 while index < delta_length:
3229 cmd = ord(delta[index : index + 1])
3230 index += 1
3231 if cmd & 0x80:
3232 cp_off = 0
3233 for i in range(4):
3234 if cmd & (1 << i):
3235 x = ord(delta[index : index + 1])
3236 index += 1
3237 cp_off |= x << (i * 8)
3238 cp_size = 0
3239 # Version 3 packs can contain copy sizes larger than 64K.
3240 for i in range(3):
3241 if cmd & (1 << (4 + i)):
3242 x = ord(delta[index : index + 1])
3243 index += 1
3244 cp_size |= x << (i * 8)
3245 if cp_size == 0:
3246 cp_size = 0x10000
3247 if (
3248 cp_off + cp_size < cp_size
3249 or cp_off + cp_size > src_size
3250 or cp_size > dest_size
3251 ):
3252 break
3253 out.append(src_buf[cp_off : cp_off + cp_size])
3254 elif cmd != 0:
3255 out.append(delta[index : index + cmd])
3256 index += cmd
3257 else:
3258 raise ApplyDeltaError("Invalid opcode 0")
3260 if index != delta_length:
3261 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}")
3263 if dest_size != chunks_length(out):
3264 raise ApplyDeltaError("dest size incorrect")
3266 return out
3269def write_pack_index_v2(
3270 f: IO[bytes],
3271 entries: Iterable[tuple[bytes, int, Union[int, None]]],
3272 pack_checksum: bytes,
3273) -> bytes:
3274 """Write a new pack index file.
3276 Args:
3277 f: File-like object to write to
3278 entries: List of tuples with object name (sha), offset_in_pack, and
3279 crc32_checksum.
3280 pack_checksum: Checksum of the pack file.
3281 Returns: The SHA of the index file written
3282 """
3283 f = SHA1Writer(f)
3284 f.write(b"\377tOc") # Magic!
3285 f.write(struct.pack(">L", 2))
3286 fan_out_table: dict[int, int] = defaultdict(lambda: 0)
3287 for name, offset, entry_checksum in entries:
3288 fan_out_table[ord(name[:1])] += 1
3289 # Fan-out table
3290 largetable: list[int] = []
3291 for i in range(0x100):
3292 f.write(struct.pack(b">L", fan_out_table[i]))
3293 fan_out_table[i + 1] += fan_out_table[i]
3294 for name, offset, entry_checksum in entries:
3295 f.write(name)
3296 for name, offset, entry_checksum in entries:
3297 f.write(struct.pack(b">L", entry_checksum))
3298 for name, offset, entry_checksum in entries:
3299 if offset < 2**31:
3300 f.write(struct.pack(b">L", offset))
3301 else:
3302 f.write(struct.pack(b">L", 2**31 + len(largetable)))
3303 largetable.append(offset)
3304 for offset in largetable:
3305 f.write(struct.pack(b">Q", offset))
3306 assert len(pack_checksum) == 20
3307 f.write(pack_checksum)
3308 return f.write_sha()
3311def write_pack_index_v3(
3312 f: IO[bytes],
3313 entries: Iterable[tuple[bytes, int, Union[int, None]]],
3314 pack_checksum: bytes,
3315 hash_algorithm: int = 1,
3316) -> bytes:
3317 """Write a new pack index file in v3 format.
3319 Args:
3320 f: File-like object to write to
3321 entries: List of tuples with object name (sha), offset_in_pack, and
3322 crc32_checksum.
3323 pack_checksum: Checksum of the pack file.
3324 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256)
3325 Returns: The SHA of the index file written
3326 """
3327 if hash_algorithm == 1:
3328 hash_size = 20 # SHA-1
3329 writer_cls = SHA1Writer
3330 elif hash_algorithm == 2:
3331 hash_size = 32 # SHA-256
3332 # TODO: Add SHA256Writer when SHA-256 support is implemented
3333 raise NotImplementedError("SHA-256 support not yet implemented")
3334 else:
3335 raise ValueError(f"Unknown hash algorithm {hash_algorithm}")
3337 # Convert entries to list to allow multiple iterations
3338 entries_list = list(entries)
3340 # Calculate shortest unambiguous prefix length for object names
3341 # For now, use full hash size (this could be optimized)
3342 shortened_oid_len = hash_size
3344 f = writer_cls(f)
3345 f.write(b"\377tOc") # Magic!
3346 f.write(struct.pack(">L", 3)) # Version 3
3347 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm
3348 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length
3350 fan_out_table: dict[int, int] = defaultdict(lambda: 0)
3351 for name, offset, entry_checksum in entries_list:
3352 if len(name) != hash_size:
3353 raise ValueError(
3354 f"Object name has wrong length: expected {hash_size}, got {len(name)}"
3355 )
3356 fan_out_table[ord(name[:1])] += 1
3358 # Fan-out table
3359 largetable: list[int] = []
3360 for i in range(0x100):
3361 f.write(struct.pack(b">L", fan_out_table[i]))
3362 fan_out_table[i + 1] += fan_out_table[i]
3364 # Object names table
3365 for name, offset, entry_checksum in entries_list:
3366 f.write(name)
3368 # CRC32 checksums table
3369 for name, offset, entry_checksum in entries_list:
3370 f.write(struct.pack(b">L", entry_checksum))
3372 # Offset table
3373 for name, offset, entry_checksum in entries_list:
3374 if offset < 2**31:
3375 f.write(struct.pack(b">L", offset))
3376 else:
3377 f.write(struct.pack(b">L", 2**31 + len(largetable)))
3378 largetable.append(offset)
3380 # Large offset table
3381 for offset in largetable:
3382 f.write(struct.pack(b">Q", offset))
3384 assert len(pack_checksum) == hash_size, (
3385 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}"
3386 )
3387 f.write(pack_checksum)
3388 return f.write_sha()
3391def write_pack_index(
3392 f: IO[bytes],
3393 entries: Iterable[tuple[bytes, int, Union[int, None]]],
3394 pack_checksum: bytes,
3395 progress: Optional[Callable[..., None]] = None,
3396 version: Optional[int] = None,
3397) -> bytes:
3398 """Write a pack index file.
3400 Args:
3401 f: File-like object to write to.
3402 entries: List of (checksum, offset, crc32) tuples
3403 pack_checksum: Checksum of the pack file.
3404 progress: Progress function (not currently used)
3405 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION.
3407 Returns:
3408 SHA of the written index file
3409 """
3410 if version is None:
3411 version = DEFAULT_PACK_INDEX_VERSION
3413 if version == 1:
3414 return write_pack_index_v1(f, entries, pack_checksum)
3415 elif version == 2:
3416 return write_pack_index_v2(f, entries, pack_checksum)
3417 elif version == 3:
3418 return write_pack_index_v3(f, entries, pack_checksum)
3419 else:
3420 raise ValueError(f"Unsupported pack index version: {version}")
3423class Pack:
3424 """A Git pack object."""
3426 _data_load: Optional[Callable[[], PackData]]
3427 _idx_load: Optional[Callable[[], PackIndex]]
3429 _data: Optional[PackData]
3430 _idx: Optional[PackIndex]
3432 def __init__(
3433 self,
3434 basename: str,
3435 resolve_ext_ref: Optional[ResolveExtRefFn] = None,
3436 *,
3437 delta_window_size: Optional[int] = None,
3438 window_memory: Optional[int] = None,
3439 delta_cache_size: Optional[int] = None,
3440 depth: Optional[int] = None,
3441 threads: Optional[int] = None,
3442 big_file_threshold: Optional[int] = None,
3443 ) -> None:
3444 """Initialize a Pack object.
3446 Args:
3447 basename: Base path for pack files (without .pack/.idx extension)
3448 resolve_ext_ref: Optional function to resolve external references
3449 delta_window_size: Size of the delta compression window
3450 window_memory: Memory limit for delta compression window
3451 delta_cache_size: Size of the delta cache
3452 depth: Maximum depth for delta chains
3453 threads: Number of threads to use for operations
3454 big_file_threshold: Size threshold for big file handling
3455 """
3456 self._basename = basename
3457 self._data = None
3458 self._idx = None
3459 self._idx_path = self._basename + ".idx"
3460 self._data_path = self._basename + ".pack"
3461 self.delta_window_size = delta_window_size
3462 self.window_memory = window_memory
3463 self.delta_cache_size = delta_cache_size
3464 self.depth = depth
3465 self.threads = threads
3466 self.big_file_threshold = big_file_threshold
3467 self._data_load = lambda: PackData(
3468 self._data_path,
3469 delta_window_size=delta_window_size,
3470 window_memory=window_memory,
3471 delta_cache_size=delta_cache_size,
3472 depth=depth,
3473 threads=threads,
3474 big_file_threshold=big_file_threshold,
3475 )
3476 self._idx_load = lambda: load_pack_index(self._idx_path)
3477 self.resolve_ext_ref = resolve_ext_ref
3479 @classmethod
3480 def from_lazy_objects(
3481 cls, data_fn: Callable[[], PackData], idx_fn: Callable[[], PackIndex]
3482 ) -> "Pack":
3483 """Create a new pack object from callables to load pack data and index objects."""
3484 ret = cls("")
3485 ret._data_load = data_fn
3486 ret._idx_load = idx_fn
3487 return ret
3489 @classmethod
3490 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack":
3491 """Create a new pack object from pack data and index objects."""
3492 ret = cls("")
3493 ret._data = data
3494 ret._data_load = None
3495 ret._idx = idx
3496 ret._idx_load = None
3497 ret.check_length_and_checksum()
3498 return ret
3500 def name(self) -> bytes:
3501 """The SHA over the SHAs of the objects in this pack."""
3502 return self.index.objects_sha1()
3504 @property
3505 def data(self) -> PackData:
3506 """The pack data object being used."""
3507 if self._data is None:
3508 assert self._data_load
3509 self._data = self._data_load()
3510 self.check_length_and_checksum()
3511 return self._data
3513 @property
3514 def index(self) -> PackIndex:
3515 """The index being used.
3517 Note: This may be an in-memory index
3518 """
3519 if self._idx is None:
3520 assert self._idx_load
3521 self._idx = self._idx_load()
3522 return self._idx
3524 def close(self) -> None:
3525 """Close the pack file and index."""
3526 if self._data is not None:
3527 self._data.close()
3528 if self._idx is not None:
3529 self._idx.close()
3531 def __enter__(self) -> "Pack":
3532 """Enter context manager."""
3533 return self
3535 def __exit__(
3536 self,
3537 exc_type: Optional[type],
3538 exc_val: Optional[BaseException],
3539 exc_tb: Optional[TracebackType],
3540 ) -> None:
3541 """Exit context manager."""
3542 self.close()
3544 def __eq__(self, other: object) -> bool:
3545 """Check equality with another pack."""
3546 if not isinstance(other, Pack):
3547 return False
3548 return self.index == other.index
3550 def __len__(self) -> int:
3551 """Number of entries in this pack."""
3552 return len(self.index)
3554 def __repr__(self) -> str:
3555 """Return string representation of this pack."""
3556 return f"{self.__class__.__name__}({self._basename!r})"
3558 def __iter__(self) -> Iterator[bytes]:
3559 """Iterate over all the sha1s of the objects in this pack."""
3560 return iter(self.index)
3562 def check_length_and_checksum(self) -> None:
3563 """Sanity check the length and checksum of the pack index and data."""
3564 assert len(self.index) == len(self.data), (
3565 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)"
3566 )
3567 idx_stored_checksum = self.index.get_pack_checksum()
3568 data_stored_checksum = self.data.get_stored_checksum()
3569 if (
3570 idx_stored_checksum is not None
3571 and idx_stored_checksum != data_stored_checksum
3572 ):
3573 raise ChecksumMismatch(
3574 sha_to_hex(idx_stored_checksum),
3575 sha_to_hex(data_stored_checksum),
3576 )
3578 def check(self) -> None:
3579 """Check the integrity of this pack.
3581 Raises:
3582 ChecksumMismatch: if a checksum for the index or data is wrong
3583 """
3584 self.index.check()
3585 self.data.check()
3586 for obj in self.iterobjects():
3587 obj.check()
3588 # TODO: object connectivity checks
3590 def get_stored_checksum(self) -> bytes:
3591 """Return the stored checksum of the pack data."""
3592 return self.data.get_stored_checksum()
3594 def pack_tuples(self) -> list[tuple[ShaFile, None]]:
3595 """Return pack tuples for all objects in pack."""
3596 return [(o, None) for o in self.iterobjects()]
3598 def __contains__(self, sha1: bytes) -> bool:
3599 """Check whether this pack contains a particular SHA1."""
3600 try:
3601 self.index.object_offset(sha1)
3602 return True
3603 except KeyError:
3604 return False
3606 def get_raw(self, sha1: bytes) -> tuple[int, bytes]:
3607 """Get raw object data by SHA1."""
3608 offset = self.index.object_offset(sha1)
3609 obj_type, obj = self.data.get_object_at(offset)
3610 type_num, chunks = self.resolve_object(offset, obj_type, obj)
3611 return type_num, b"".join(chunks) # type: ignore[arg-type]
3613 def __getitem__(self, sha1: bytes) -> ShaFile:
3614 """Retrieve the specified SHA1."""
3615 type, uncomp = self.get_raw(sha1)
3616 return ShaFile.from_raw_string(type, uncomp, sha=sha1)
3618 def iterobjects(self) -> Iterator[ShaFile]:
3619 """Iterate over the objects in this pack."""
3620 return iter(
3621 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref)
3622 )
3624 def iterobjects_subset(
3625 self, shas: Iterable[ObjectID], *, allow_missing: bool = False
3626 ) -> Iterator[ShaFile]:
3627 """Iterate over a subset of objects in this pack."""
3628 return (
3629 uo
3630 for uo in PackInflater.for_pack_subset(
3631 self,
3632 shas,
3633 allow_missing=allow_missing,
3634 resolve_ext_ref=self.resolve_ext_ref,
3635 )
3636 if uo.id in shas
3637 )
3639 def iter_unpacked_subset(
3640 self,
3641 shas: Iterable[ObjectID],
3642 *,
3643 include_comp: bool = False,
3644 allow_missing: bool = False,
3645 convert_ofs_delta: bool = False,
3646 ) -> Iterator[UnpackedObject]:
3647 """Iterate over unpacked objects in subset."""
3648 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list)
3649 ofs: dict[int, bytes] = {}
3650 todo = set(shas)
3651 for unpacked in self.iter_unpacked(include_comp=include_comp):
3652 sha = unpacked.sha()
3653 if unpacked.offset is not None:
3654 ofs[unpacked.offset] = sha
3655 hexsha = sha_to_hex(sha)
3656 if hexsha in todo:
3657 if unpacked.pack_type_num == OFS_DELTA:
3658 assert isinstance(unpacked.delta_base, int)
3659 assert unpacked.offset is not None
3660 base_offset = unpacked.offset - unpacked.delta_base
3661 try:
3662 unpacked.delta_base = ofs[base_offset]
3663 except KeyError:
3664 ofs_pending[base_offset].append(unpacked)
3665 continue
3666 else:
3667 unpacked.pack_type_num = REF_DELTA
3668 yield unpacked
3669 todo.remove(hexsha)
3670 if unpacked.offset is not None:
3671 for child in ofs_pending.pop(unpacked.offset, []):
3672 child.pack_type_num = REF_DELTA
3673 child.delta_base = sha
3674 yield child
3675 assert not ofs_pending
3676 if not allow_missing and todo:
3677 raise UnresolvedDeltas(list(todo))
3679 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]:
3680 """Iterate over all unpacked objects in this pack."""
3681 ofs_to_entries = {
3682 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries()
3683 }
3684 for unpacked in self.data.iter_unpacked(include_comp=include_comp):
3685 assert unpacked.offset is not None
3686 (sha, crc32) = ofs_to_entries[unpacked.offset]
3687 unpacked._sha = sha
3688 unpacked.crc32 = crc32
3689 yield unpacked
3691 def keep(self, msg: Optional[bytes] = None) -> str:
3692 """Add a .keep file for the pack, preventing git from garbage collecting it.
3694 Args:
3695 msg: A message written inside the .keep file; can be used later
3696 to determine whether or not a .keep file is obsolete.
3697 Returns: The path of the .keep file, as a string.
3698 """
3699 keepfile_name = f"{self._basename}.keep"
3700 with GitFile(keepfile_name, "wb") as keepfile:
3701 if msg:
3702 keepfile.write(msg)
3703 keepfile.write(b"\n")
3704 return keepfile_name
3706 def get_ref(self, sha: bytes) -> tuple[Optional[int], int, OldUnpackedObject]:
3707 """Get the object for a ref SHA, only looking in this pack."""
3708 # TODO: cache these results
3709 try:
3710 offset = self.index.object_offset(sha)
3711 except KeyError:
3712 offset = None
3713 if offset:
3714 type, obj = self.data.get_object_at(offset)
3715 elif self.resolve_ext_ref:
3716 type, obj = self.resolve_ext_ref(sha)
3717 else:
3718 raise KeyError(sha)
3719 return offset, type, obj
3721 def resolve_object(
3722 self,
3723 offset: int,
3724 type: int,
3725 obj: OldUnpackedObject,
3726 get_ref: Optional[
3727 Callable[[bytes], tuple[Optional[int], int, OldUnpackedObject]]
3728 ] = None,
3729 ) -> tuple[int, OldUnpackedObject]:
3730 """Resolve an object, possibly resolving deltas when necessary.
3732 Returns: Tuple with object type and contents.
3733 """
3734 # Walk down the delta chain, building a stack of deltas to reach
3735 # the requested object.
3736 base_offset = offset
3737 base_type = type
3738 base_obj = obj
3739 delta_stack = []
3740 while base_type in DELTA_TYPES:
3741 prev_offset = base_offset
3742 if get_ref is None:
3743 get_ref = self.get_ref
3744 if base_type == OFS_DELTA:
3745 (delta_offset, delta) = base_obj
3746 # TODO: clean up asserts and replace with nicer error messages
3747 assert isinstance(delta_offset, int), (
3748 f"Expected int, got {delta_offset.__class__}"
3749 )
3750 base_offset = base_offset - delta_offset
3751 base_type, base_obj = self.data.get_object_at(base_offset)
3752 assert isinstance(base_type, int)
3753 elif base_type == REF_DELTA:
3754 (basename, delta) = base_obj
3755 assert isinstance(basename, bytes) and len(basename) == 20
3756 base_offset, base_type, base_obj = get_ref(basename) # type: ignore[assignment]
3757 assert isinstance(base_type, int)
3758 if base_offset == prev_offset: # object is based on itself
3759 raise UnresolvedDeltas([basename])
3760 delta_stack.append((prev_offset, base_type, delta))
3762 # Now grab the base object (mustn't be a delta) and apply the
3763 # deltas all the way up the stack.
3764 chunks = base_obj
3765 for prev_offset, _delta_type, delta in reversed(delta_stack):
3766 # Convert chunks to bytes for apply_delta if needed
3767 if isinstance(chunks, list):
3768 chunks_bytes = b"".join(chunks)
3769 elif isinstance(chunks, tuple):
3770 # For tuple type, second element is the actual data
3771 _, chunk_data = chunks
3772 if isinstance(chunk_data, list):
3773 chunks_bytes = b"".join(chunk_data)
3774 else:
3775 chunks_bytes = chunk_data
3776 else:
3777 chunks_bytes = chunks
3779 # Apply delta and get result as list
3780 chunks = apply_delta(chunks_bytes, delta)
3782 if prev_offset is not None:
3783 self.data._offset_cache[prev_offset] = base_type, chunks
3784 return base_type, chunks
3786 def entries(
3787 self, progress: Optional[Callable[[int, int], None]] = None
3788 ) -> Iterator[PackIndexEntry]:
3789 """Yield entries summarizing the contents of this pack.
3791 Args:
3792 progress: Progress function, called with current and total
3793 object count.
3794 Returns: iterator of tuples with (sha, offset, crc32)
3795 """
3796 return self.data.iterentries(
3797 progress=progress, resolve_ext_ref=self.resolve_ext_ref
3798 )
3800 def sorted_entries(
3801 self, progress: Optional[ProgressFn] = None
3802 ) -> Iterator[PackIndexEntry]:
3803 """Return entries in this pack, sorted by SHA.
3805 Args:
3806 progress: Progress function, called with current and total
3807 object count
3808 Returns: Iterator of tuples with (sha, offset, crc32)
3809 """
3810 return iter(
3811 self.data.sorted_entries(
3812 progress=progress, resolve_ext_ref=self.resolve_ext_ref
3813 )
3814 )
3816 def get_unpacked_object(
3817 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True
3818 ) -> UnpackedObject:
3819 """Get the unpacked object for a sha.
3821 Args:
3822 sha: SHA of object to fetch
3823 include_comp: Whether to include compression data in UnpackedObject
3824 convert_ofs_delta: Whether to convert offset deltas to ref deltas
3825 """
3826 offset = self.index.object_offset(sha)
3827 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp)
3828 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta:
3829 assert isinstance(unpacked.delta_base, int)
3830 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base)
3831 unpacked.pack_type_num = REF_DELTA
3832 return unpacked
3835def extend_pack(
3836 f: BinaryIO,
3837 object_ids: Set[ObjectID],
3838 get_raw: Callable[[ObjectID], tuple[int, bytes]],
3839 *,
3840 compression_level: int = -1,
3841 progress: Optional[Callable[[bytes], None]] = None,
3842) -> tuple[bytes, list[tuple[bytes, int, int]]]:
3843 """Extend a pack file with more objects.
3845 The caller should make sure that object_ids does not contain any objects
3846 that are already in the pack
3847 """
3848 # Update the header with the new number of objects.
3849 f.seek(0)
3850 _version, num_objects = read_pack_header(f.read)
3852 if object_ids:
3853 f.seek(0)
3854 write_pack_header(f.write, num_objects + len(object_ids))
3856 # Must flush before reading (http://bugs.python.org/issue3207)
3857 f.flush()
3859 # Rescan the rest of the pack, computing the SHA with the new header.
3860 new_sha = compute_file_sha(f, end_ofs=-20)
3862 # Must reposition before writing (http://bugs.python.org/issue3207)
3863 f.seek(0, os.SEEK_CUR)
3865 extra_entries = []
3867 # Complete the pack.
3868 for i, object_id in enumerate(object_ids):
3869 if progress is not None:
3870 progress(
3871 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii")
3872 )
3873 assert len(object_id) == 20
3874 type_num, data = get_raw(object_id)
3875 offset = f.tell()
3876 crc32 = write_pack_object(
3877 f.write,
3878 type_num,
3879 [data], # Convert bytes to list[bytes]
3880 sha=new_sha,
3881 compression_level=compression_level,
3882 )
3883 extra_entries.append((object_id, offset, crc32))
3884 pack_sha = new_sha.digest()
3885 f.write(pack_sha)
3886 return pack_sha, extra_entries
3889try:
3890 from dulwich._pack import ( # type: ignore
3891 apply_delta,
3892 bisect_find_sha,
3893 )
3894except ImportError:
3895 pass