Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/pack.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pack.py -- For dealing with packed git objects.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Classes for dealing with packed git objects.
25A pack is a compact representation of a bunch of objects, stored
26using deltas where possible.
28They have two parts, the pack file, which stores the data, and an index
29that tells you where the data is.
31To find an object you look in all of the index files 'til you find a
32match for the object name. You then use the pointer got from this as
33a pointer in to the corresponding packfile.
34"""
36import binascii
37from collections import defaultdict, deque
38from contextlib import suppress
39from io import BytesIO, UnsupportedOperation
41try:
42 from cdifflib import CSequenceMatcher as SequenceMatcher
43except ModuleNotFoundError:
44 from difflib import SequenceMatcher
46import os
47import struct
48import sys
49import warnings
50import zlib
51from collections.abc import Callable, Iterable, Iterator, Sequence, Set
52from hashlib import sha1
53from itertools import chain
54from os import SEEK_CUR, SEEK_END
55from struct import unpack_from
56from types import TracebackType
57from typing import (
58 IO,
59 TYPE_CHECKING,
60 Any,
61 BinaryIO,
62 Generic,
63 Optional,
64 Protocol,
65 TypeVar,
66 Union,
67)
69try:
70 import mmap
71except ImportError:
72 has_mmap = False
73else:
74 has_mmap = True
76if sys.version_info >= (3, 12):
77 from collections.abc import Buffer
78else:
79 Buffer = bytes | bytearray | memoryview
81if TYPE_CHECKING:
82 from _hashlib import HASH as HashObject
84 from .bitmap import PackBitmap
85 from .commit_graph import CommitGraph
87# For some reason the above try, except fails to set has_mmap = False for plan9
88if sys.platform == "Plan9":
89 has_mmap = False
91from . import replace_me
92from .errors import ApplyDeltaError, ChecksumMismatch
93from .file import GitFile, _GitFile
94from .lru_cache import LRUSizeCache
95from .objects import ObjectID, ShaFile, hex_to_sha, object_header, sha_to_hex
97OFS_DELTA = 6
98REF_DELTA = 7
100DELTA_TYPES = (OFS_DELTA, REF_DELTA)
103DEFAULT_PACK_DELTA_WINDOW_SIZE = 10
105# Keep pack files under 16Mb in memory, otherwise write them out to disk
106PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024
108# Default pack index version to use when none is specified
109DEFAULT_PACK_INDEX_VERSION = 2
112OldUnpackedObject = tuple[bytes | int, list[bytes]] | list[bytes]
113ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]]
114ProgressFn = Callable[[int, str], None]
115PackHint = tuple[int, bytes | None]
118class UnresolvedDeltas(Exception):
119 """Delta objects could not be resolved."""
121 def __init__(self, shas: list[bytes]) -> None:
122 """Initialize UnresolvedDeltas exception.
124 Args:
125 shas: List of SHA hashes for unresolved delta objects
126 """
127 self.shas = shas
130class ObjectContainer(Protocol):
131 """Protocol for objects that can contain git objects."""
133 def add_object(self, obj: ShaFile) -> None:
134 """Add a single object to this object store."""
136 def add_objects(
137 self,
138 objects: Sequence[tuple[ShaFile, str | None]],
139 progress: Callable[..., None] | None = None,
140 ) -> Optional["Pack"]:
141 """Add a set of objects to this object store.
143 Args:
144 objects: Iterable over a list of (object, path) tuples
145 progress: Progress callback for object insertion
146 Returns: Optional Pack object of the objects written.
147 """
149 def __contains__(self, sha1: bytes) -> bool:
150 """Check if a hex sha is present."""
152 def __getitem__(self, sha1: bytes) -> ShaFile:
153 """Retrieve an object."""
155 def get_commit_graph(self) -> Optional["CommitGraph"]:
156 """Get the commit graph for this object store.
158 Returns:
159 CommitGraph object if available, None otherwise
160 """
161 return None
164class PackedObjectContainer(ObjectContainer):
165 """Container for objects packed in a pack file."""
167 def get_unpacked_object(
168 self, sha1: bytes, *, include_comp: bool = False
169 ) -> "UnpackedObject":
170 """Get a raw unresolved object.
172 Args:
173 sha1: SHA-1 hash of the object
174 include_comp: Whether to include compressed data
176 Returns:
177 UnpackedObject instance
178 """
179 raise NotImplementedError(self.get_unpacked_object)
181 def iterobjects_subset(
182 self, shas: Iterable[bytes], *, allow_missing: bool = False
183 ) -> Iterator[ShaFile]:
184 """Iterate over a subset of objects.
186 Args:
187 shas: Iterable of object SHAs to retrieve
188 allow_missing: If True, skip missing objects
190 Returns:
191 Iterator of ShaFile objects
192 """
193 raise NotImplementedError(self.iterobjects_subset)
195 def iter_unpacked_subset(
196 self,
197 shas: Iterable[bytes],
198 *,
199 include_comp: bool = False,
200 allow_missing: bool = False,
201 convert_ofs_delta: bool = True,
202 ) -> Iterator["UnpackedObject"]:
203 """Iterate over unpacked objects from a subset of SHAs.
205 Args:
206 shas: Set of object SHAs to retrieve
207 include_comp: Include compressed data if True
208 allow_missing: If True, skip missing objects
209 convert_ofs_delta: If True, convert offset deltas to ref deltas
211 Returns:
212 Iterator of UnpackedObject instances
213 """
214 raise NotImplementedError(self.iter_unpacked_subset)
217class UnpackedObjectStream:
218 """Abstract base class for a stream of unpacked objects."""
220 def __iter__(self) -> Iterator["UnpackedObject"]:
221 """Iterate over unpacked objects."""
222 raise NotImplementedError(self.__iter__)
224 def __len__(self) -> int:
225 """Return the number of objects in the stream."""
226 raise NotImplementedError(self.__len__)
229def take_msb_bytes(
230 read: Callable[[int], bytes], crc32: int | None = None
231) -> tuple[list[int], int | None]:
232 """Read bytes marked with most significant bit.
234 Args:
235 read: Read function
236 crc32: Optional CRC32 checksum to update
238 Returns:
239 Tuple of (list of bytes read, updated CRC32 or None)
240 """
241 ret: list[int] = []
242 while len(ret) == 0 or ret[-1] & 0x80:
243 b = read(1)
244 if crc32 is not None:
245 crc32 = binascii.crc32(b, crc32)
246 ret.append(ord(b[:1]))
247 return ret, crc32
250class PackFileDisappeared(Exception):
251 """Raised when a pack file unexpectedly disappears."""
253 def __init__(self, obj: object) -> None:
254 """Initialize PackFileDisappeared exception.
256 Args:
257 obj: The object that triggered the exception
258 """
259 self.obj = obj
262class UnpackedObject:
263 """Class encapsulating an object unpacked from a pack file.
265 These objects should only be created from within unpack_object. Most
266 members start out as empty and are filled in at various points by
267 read_zlib_chunks, unpack_object, DeltaChainIterator, etc.
269 End users of this object should take care that the function they're getting
270 this object from is guaranteed to set the members they need.
271 """
273 __slots__ = [
274 "_sha", # Cached binary SHA.
275 "comp_chunks", # Compressed object chunks.
276 "crc32", # CRC32.
277 "decomp_chunks", # Decompressed object chunks.
278 "decomp_len", # Decompressed length of this object.
279 "delta_base", # Delta base offset or SHA.
280 "obj_chunks", # Decompressed and delta-resolved chunks.
281 "obj_type_num", # Type of this object.
282 "offset", # Offset in its pack.
283 "pack_type_num", # Type of this object in the pack (may be a delta).
284 ]
286 obj_type_num: int | None
287 obj_chunks: list[bytes] | None
288 delta_base: None | bytes | int
289 decomp_chunks: list[bytes]
290 comp_chunks: list[bytes] | None
291 decomp_len: int | None
292 crc32: int | None
293 offset: int | None
294 pack_type_num: int
295 _sha: bytes | None
297 # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be
298 # methods of this object.
299 def __init__(
300 self,
301 pack_type_num: int,
302 *,
303 delta_base: None | bytes | int = None,
304 decomp_len: int | None = None,
305 crc32: int | None = None,
306 sha: bytes | None = None,
307 decomp_chunks: list[bytes] | None = None,
308 offset: int | None = None,
309 ) -> None:
310 """Initialize an UnpackedObject.
312 Args:
313 pack_type_num: Type number of this object in the pack
314 delta_base: Delta base (offset or SHA) if this is a delta object
315 decomp_len: Decompressed length of this object
316 crc32: CRC32 checksum
317 sha: SHA-1 hash of the object
318 decomp_chunks: Decompressed chunks
319 offset: Offset in the pack file
320 """
321 self.offset = offset
322 self._sha = sha
323 self.pack_type_num = pack_type_num
324 self.delta_base = delta_base
325 self.comp_chunks = None
326 self.decomp_chunks: list[bytes] = decomp_chunks or []
327 if decomp_chunks is not None and decomp_len is None:
328 self.decomp_len = sum(map(len, decomp_chunks))
329 else:
330 self.decomp_len = decomp_len
331 self.crc32 = crc32
333 if pack_type_num in DELTA_TYPES:
334 self.obj_type_num = None
335 self.obj_chunks = None
336 else:
337 self.obj_type_num = pack_type_num
338 self.obj_chunks = self.decomp_chunks
339 self.delta_base = delta_base
341 def sha(self) -> bytes:
342 """Return the binary SHA of this object."""
343 if self._sha is None:
344 assert self.obj_type_num is not None and self.obj_chunks is not None
345 self._sha = obj_sha(self.obj_type_num, self.obj_chunks)
346 return self._sha
348 def sha_file(self) -> ShaFile:
349 """Return a ShaFile from this object."""
350 assert self.obj_type_num is not None and self.obj_chunks is not None
351 return ShaFile.from_raw_chunks(self.obj_type_num, self.obj_chunks)
353 # Only provided for backwards compatibility with code that expects either
354 # chunks or a delta tuple.
355 def _obj(self) -> OldUnpackedObject:
356 """Return the decompressed chunks, or (delta base, delta chunks)."""
357 if self.pack_type_num in DELTA_TYPES:
358 assert isinstance(self.delta_base, (bytes, int))
359 return (self.delta_base, self.decomp_chunks)
360 else:
361 return self.decomp_chunks
363 def __eq__(self, other: object) -> bool:
364 """Check equality with another UnpackedObject."""
365 if not isinstance(other, UnpackedObject):
366 return False
367 for slot in self.__slots__:
368 if getattr(self, slot) != getattr(other, slot):
369 return False
370 return True
372 def __ne__(self, other: object) -> bool:
373 """Check inequality with another UnpackedObject."""
374 return not (self == other)
376 def __repr__(self) -> str:
377 """Return string representation of this UnpackedObject."""
378 data = [f"{s}={getattr(self, s)!r}" for s in self.__slots__]
379 return "{}({})".format(self.__class__.__name__, ", ".join(data))
382_ZLIB_BUFSIZE = 65536 # 64KB buffer for better I/O performance
385def read_zlib_chunks(
386 read_some: Callable[[int], bytes],
387 unpacked: UnpackedObject,
388 include_comp: bool = False,
389 buffer_size: int = _ZLIB_BUFSIZE,
390) -> bytes:
391 """Read zlib data from a buffer.
393 This function requires that the buffer have additional data following the
394 compressed data, which is guaranteed to be the case for git pack files.
396 Args:
397 read_some: Read function that returns at least one byte, but may
398 return less than the requested size.
399 unpacked: An UnpackedObject to write result data to. If its crc32
400 attr is not None, the CRC32 of the compressed bytes will be computed
401 using this starting CRC32.
402 After this function, will have the following attrs set:
403 * comp_chunks (if include_comp is True)
404 * decomp_chunks
405 * decomp_len
406 * crc32
407 include_comp: If True, include compressed data in the result.
408 buffer_size: Size of the read buffer.
409 Returns: Leftover unused data from the decompression.
411 Raises:
412 zlib.error: if a decompression error occurred.
413 """
414 if unpacked.decomp_len is None or unpacked.decomp_len <= -1:
415 raise ValueError("non-negative zlib data stream size expected")
416 decomp_obj = zlib.decompressobj()
418 comp_chunks = []
419 decomp_chunks = unpacked.decomp_chunks
420 decomp_len = 0
421 crc32 = unpacked.crc32
423 while True:
424 add = read_some(buffer_size)
425 if not add:
426 raise zlib.error("EOF before end of zlib stream")
427 comp_chunks.append(add)
428 decomp = decomp_obj.decompress(add)
429 decomp_len += len(decomp)
430 decomp_chunks.append(decomp)
431 unused = decomp_obj.unused_data
432 if unused:
433 left = len(unused)
434 if crc32 is not None:
435 crc32 = binascii.crc32(add[:-left], crc32)
436 if include_comp:
437 comp_chunks[-1] = add[:-left]
438 break
439 elif crc32 is not None:
440 crc32 = binascii.crc32(add, crc32)
441 if crc32 is not None:
442 crc32 &= 0xFFFFFFFF
444 if decomp_len != unpacked.decomp_len:
445 raise zlib.error("decompressed data does not match expected size")
447 unpacked.crc32 = crc32
448 if include_comp:
449 unpacked.comp_chunks = comp_chunks
450 return unused
453def iter_sha1(iter: Iterable[bytes]) -> bytes:
454 """Return the hexdigest of the SHA1 over a set of names.
456 Args:
457 iter: Iterator over string objects
458 Returns: 40-byte hex sha1 digest
459 """
460 sha = sha1()
461 for name in iter:
462 sha.update(name)
463 return sha.hexdigest().encode("ascii")
466def load_pack_index(path: str | os.PathLike[str]) -> "PackIndex":
467 """Load an index file by path.
469 Args:
470 path: Path to the index file
471 Returns: A PackIndex loaded from the given path
472 """
473 with GitFile(path, "rb") as f:
474 return load_pack_index_file(path, f)
477def _load_file_contents(
478 f: IO[bytes] | _GitFile, size: int | None = None
479) -> tuple[bytes | Any, int]:
480 """Load contents from a file, preferring mmap when possible.
482 Args:
483 f: File-like object to load
484 size: Expected size, or None to determine from file
485 Returns: Tuple of (contents, size)
486 """
487 try:
488 fd = f.fileno()
489 except (UnsupportedOperation, AttributeError):
490 fd = None
491 # Attempt to use mmap if possible
492 if fd is not None:
493 if size is None:
494 size = os.fstat(fd).st_size
495 if has_mmap:
496 try:
497 contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ)
498 except (OSError, ValueError):
499 # Can't mmap - perhaps a socket or invalid file descriptor
500 pass
501 else:
502 return contents, size
503 contents_bytes = f.read()
504 size = len(contents_bytes)
505 return contents_bytes, size
508def load_pack_index_file(
509 path: str | os.PathLike[str], f: IO[bytes] | _GitFile
510) -> "PackIndex":
511 """Load an index file from a file-like object.
513 Args:
514 path: Path for the index file
515 f: File-like object
516 Returns: A PackIndex loaded from the given file
517 """
518 contents, size = _load_file_contents(f)
519 if contents[:4] == b"\377tOc":
520 version = struct.unpack(b">L", contents[4:8])[0]
521 if version == 2:
522 return PackIndex2(path, file=f, contents=contents, size=size)
523 elif version == 3:
524 return PackIndex3(path, file=f, contents=contents, size=size)
525 else:
526 raise KeyError(f"Unknown pack index format {version}")
527 else:
528 return PackIndex1(path, file=f, contents=contents, size=size)
531def bisect_find_sha(
532 start: int, end: int, sha: bytes, unpack_name: Callable[[int], bytes]
533) -> int | None:
534 """Find a SHA in a data blob with sorted SHAs.
536 Args:
537 start: Start index of range to search
538 end: End index of range to search
539 sha: Sha to find
540 unpack_name: Callback to retrieve SHA by index
541 Returns: Index of the SHA, or None if it wasn't found
542 """
543 assert start <= end
544 while start <= end:
545 i = (start + end) // 2
546 file_sha = unpack_name(i)
547 if file_sha < sha:
548 start = i + 1
549 elif file_sha > sha:
550 end = i - 1
551 else:
552 return i
553 return None
556PackIndexEntry = tuple[bytes, int, int | None]
559class PackIndex:
560 """An index in to a packfile.
562 Given a sha id of an object a pack index can tell you the location in the
563 packfile of that object if it has it.
564 """
566 # Default to SHA-1 for backward compatibility
567 hash_algorithm = 1
568 hash_size = 20
570 def __eq__(self, other: object) -> bool:
571 """Check equality with another PackIndex."""
572 if not isinstance(other, PackIndex):
573 return False
575 for (name1, _, _), (name2, _, _) in zip(
576 self.iterentries(), other.iterentries()
577 ):
578 if name1 != name2:
579 return False
580 return True
582 def __ne__(self, other: object) -> bool:
583 """Check if this pack index is not equal to another."""
584 return not self.__eq__(other)
586 def __len__(self) -> int:
587 """Return the number of entries in this pack index."""
588 raise NotImplementedError(self.__len__)
590 def __iter__(self) -> Iterator[bytes]:
591 """Iterate over the SHAs in this pack."""
592 return map(sha_to_hex, self._itersha())
594 def iterentries(self) -> Iterator[PackIndexEntry]:
595 """Iterate over the entries in this pack index.
597 Returns: iterator over tuples with object name, offset in packfile and
598 crc32 checksum.
599 """
600 raise NotImplementedError(self.iterentries)
602 def get_pack_checksum(self) -> bytes | None:
603 """Return the SHA1 checksum stored for the corresponding packfile.
605 Returns: 20-byte binary digest, or None if not available
606 """
607 raise NotImplementedError(self.get_pack_checksum)
609 @replace_me(since="0.21.0", remove_in="0.23.0")
610 def object_index(self, sha: bytes) -> int:
611 """Return the index for the given SHA.
613 Args:
614 sha: SHA-1 hash
616 Returns:
617 Index position
618 """
619 return self.object_offset(sha)
621 def object_offset(self, sha: bytes) -> int:
622 """Return the offset in to the corresponding packfile for the object.
624 Given the name of an object it will return the offset that object
625 lives at within the corresponding pack file. If the pack file doesn't
626 have the object then None will be returned.
627 """
628 raise NotImplementedError(self.object_offset)
630 def object_sha1(self, index: int) -> bytes:
631 """Return the SHA1 corresponding to the index in the pack file."""
632 for name, offset, _crc32 in self.iterentries():
633 if offset == index:
634 return name
635 else:
636 raise KeyError(index)
638 def _object_offset(self, sha: bytes) -> int:
639 """See object_offset.
641 Args:
642 sha: A *binary* SHA string. (20 characters long)_
643 """
644 raise NotImplementedError(self._object_offset)
646 def objects_sha1(self) -> bytes:
647 """Return the hex SHA1 over all the shas of all objects in this pack.
649 Note: This is used for the filename of the pack.
650 """
651 return iter_sha1(self._itersha())
653 def _itersha(self) -> Iterator[bytes]:
654 """Yield all the SHA1's of the objects in the index, sorted."""
655 raise NotImplementedError(self._itersha)
657 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]:
658 """Iterate over all SHA1s with the given prefix.
660 Args:
661 prefix: Binary prefix to match
662 Returns: Iterator of matching SHA1s
663 """
664 # Default implementation for PackIndex classes that don't override
665 for sha, _, _ in self.iterentries():
666 if sha.startswith(prefix):
667 yield sha
669 def close(self) -> None:
670 """Close any open files."""
672 def check(self) -> None:
673 """Check the consistency of this pack index."""
676class MemoryPackIndex(PackIndex):
677 """Pack index that is stored entirely in memory."""
679 def __init__(
680 self,
681 entries: list[tuple[bytes, int, int | None]],
682 pack_checksum: bytes | None = None,
683 ) -> None:
684 """Create a new MemoryPackIndex.
686 Args:
687 entries: Sequence of name, idx, crc32 (sorted)
688 pack_checksum: Optional pack checksum
689 """
690 self._by_sha = {}
691 self._by_offset = {}
692 for name, offset, _crc32 in entries:
693 self._by_sha[name] = offset
694 self._by_offset[offset] = name
695 self._entries = entries
696 self._pack_checksum = pack_checksum
698 def get_pack_checksum(self) -> bytes | None:
699 """Return the SHA checksum stored for the corresponding packfile."""
700 return self._pack_checksum
702 def __len__(self) -> int:
703 """Return the number of entries in this pack index."""
704 return len(self._entries)
706 def object_offset(self, sha: bytes) -> int:
707 """Return the offset for the given SHA.
709 Args:
710 sha: SHA to look up (binary or hex)
711 Returns: Offset in the pack file
712 """
713 if len(sha) == 40:
714 sha = hex_to_sha(sha)
715 return self._by_sha[sha]
717 def object_sha1(self, offset: int) -> bytes:
718 """Return the SHA1 for the object at the given offset."""
719 return self._by_offset[offset]
721 def _itersha(self) -> Iterator[bytes]:
722 """Iterate over all SHA1s in the index."""
723 return iter(self._by_sha)
725 def iterentries(self) -> Iterator[PackIndexEntry]:
726 """Iterate over all index entries."""
727 return iter(self._entries)
729 @classmethod
730 def for_pack(cls, pack_data: "PackData") -> "MemoryPackIndex":
731 """Create a MemoryPackIndex from a PackData object."""
732 return MemoryPackIndex(
733 list(pack_data.sorted_entries()), pack_data.get_stored_checksum()
734 )
736 @classmethod
737 def clone(cls, other_index: "PackIndex") -> "MemoryPackIndex":
738 """Create a copy of another PackIndex in memory."""
739 return cls(list(other_index.iterentries()), other_index.get_pack_checksum())
742class FilePackIndex(PackIndex):
743 """Pack index that is based on a file.
745 To do the loop it opens the file, and indexes first 256 4 byte groups
746 with the first byte of the sha id. The value in the four byte group indexed
747 is the end of the group that shares the same starting byte. Subtract one
748 from the starting byte and index again to find the start of the group.
749 The values are sorted by sha id within the group, so do the math to find
750 the start and end offset and then bisect in to find if the value is
751 present.
752 """
754 _fan_out_table: list[int]
755 _file: IO[bytes] | _GitFile
757 def __init__(
758 self,
759 filename: str | os.PathLike[str],
760 file: IO[bytes] | _GitFile | None = None,
761 contents: Union[bytes, "mmap.mmap"] | None = None,
762 size: int | None = None,
763 ) -> None:
764 """Create a pack index object.
766 Provide it with the name of the index file to consider, and it will map
767 it whenever required.
768 """
769 self._filename = filename
770 # Take the size now, so it can be checked each time we map the file to
771 # ensure that it hasn't changed.
772 if file is None:
773 self._file = GitFile(filename, "rb")
774 else:
775 self._file = file
776 if contents is None:
777 self._contents, self._size = _load_file_contents(self._file, size)
778 else:
779 self._contents = contents
780 self._size = size if size is not None else len(contents)
782 @property
783 def path(self) -> str:
784 """Return the path to this index file."""
785 return os.fspath(self._filename)
787 def __eq__(self, other: object) -> bool:
788 """Check equality with another FilePackIndex."""
789 # Quick optimization:
790 if (
791 isinstance(other, FilePackIndex)
792 and self._fan_out_table != other._fan_out_table
793 ):
794 return False
796 return super().__eq__(other)
798 def close(self) -> None:
799 """Close the underlying file and any mmap."""
800 self._file.close()
801 close_fn = getattr(self._contents, "close", None)
802 if close_fn is not None:
803 close_fn()
805 def __len__(self) -> int:
806 """Return the number of entries in this pack index."""
807 return self._fan_out_table[-1]
809 def _unpack_entry(self, i: int) -> PackIndexEntry:
810 """Unpack the i-th entry in the index file.
812 Returns: Tuple with object name (SHA), offset in pack file and CRC32
813 checksum (if known).
814 """
815 raise NotImplementedError(self._unpack_entry)
817 def _unpack_name(self, i: int) -> bytes:
818 """Unpack the i-th name from the index file."""
819 raise NotImplementedError(self._unpack_name)
821 def _unpack_offset(self, i: int) -> int:
822 """Unpack the i-th object offset from the index file."""
823 raise NotImplementedError(self._unpack_offset)
825 def _unpack_crc32_checksum(self, i: int) -> int | None:
826 """Unpack the crc32 checksum for the ith object from the index file."""
827 raise NotImplementedError(self._unpack_crc32_checksum)
829 def _itersha(self) -> Iterator[bytes]:
830 """Iterate over all SHA1s in the index."""
831 for i in range(len(self)):
832 yield self._unpack_name(i)
834 def iterentries(self) -> Iterator[PackIndexEntry]:
835 """Iterate over the entries in this pack index.
837 Returns: iterator over tuples with object name, offset in packfile and
838 crc32 checksum.
839 """
840 for i in range(len(self)):
841 yield self._unpack_entry(i)
843 def _read_fan_out_table(self, start_offset: int) -> list[int]:
844 """Read the fan-out table from the index.
846 The fan-out table contains 256 entries mapping first byte values
847 to the number of objects with SHA1s less than or equal to that byte.
849 Args:
850 start_offset: Offset in the file where the fan-out table starts
851 Returns: List of 256 integers
852 """
853 ret = []
854 for i in range(0x100):
855 fanout_entry = self._contents[
856 start_offset + i * 4 : start_offset + (i + 1) * 4
857 ]
858 ret.append(struct.unpack(">L", fanout_entry)[0])
859 return ret
861 def check(self) -> None:
862 """Check that the stored checksum matches the actual checksum."""
863 actual = self.calculate_checksum()
864 stored = self.get_stored_checksum()
865 if actual != stored:
866 raise ChecksumMismatch(stored, actual)
868 def calculate_checksum(self) -> bytes:
869 """Calculate the SHA1 checksum over this pack index.
871 Returns: This is a 20-byte binary digest
872 """
873 return sha1(self._contents[:-20]).digest()
875 def get_pack_checksum(self) -> bytes:
876 """Return the SHA1 checksum stored for the corresponding packfile.
878 Returns: 20-byte binary digest
879 """
880 return bytes(self._contents[-40:-20])
882 def get_stored_checksum(self) -> bytes:
883 """Return the SHA1 checksum stored for this index.
885 Returns: 20-byte binary digest
886 """
887 return bytes(self._contents[-20:])
889 def object_offset(self, sha: bytes) -> int:
890 """Return the offset in to the corresponding packfile for the object.
892 Given the name of an object it will return the offset that object
893 lives at within the corresponding pack file. If the pack file doesn't
894 have the object then None will be returned.
895 """
896 if len(sha) == 40:
897 sha = hex_to_sha(sha)
898 try:
899 return self._object_offset(sha)
900 except ValueError as exc:
901 closed = getattr(self._contents, "closed", None)
902 if closed in (None, True):
903 raise PackFileDisappeared(self) from exc
904 raise
906 def _object_offset(self, sha: bytes) -> int:
907 """See object_offset.
909 Args:
910 sha: A *binary* SHA string. (20 characters long)_
911 """
912 assert len(sha) == 20
913 idx = ord(sha[:1])
914 if idx == 0:
915 start = 0
916 else:
917 start = self._fan_out_table[idx - 1]
918 end = self._fan_out_table[idx]
919 i = bisect_find_sha(start, end, sha, self._unpack_name)
920 if i is None:
921 raise KeyError(sha)
922 return self._unpack_offset(i)
924 def iter_prefix(self, prefix: bytes) -> Iterator[bytes]:
925 """Iterate over all SHA1s with the given prefix."""
926 start = ord(prefix[:1])
927 if start == 0:
928 start = 0
929 else:
930 start = self._fan_out_table[start - 1]
931 end = ord(prefix[:1]) + 1
932 if end == 0x100:
933 end = len(self)
934 else:
935 end = self._fan_out_table[end]
936 assert start <= end
937 started = False
938 for i in range(start, end):
939 name: bytes = self._unpack_name(i)
940 if name.startswith(prefix):
941 yield name
942 started = True
943 elif started:
944 break
947class PackIndex1(FilePackIndex):
948 """Version 1 Pack Index file."""
950 def __init__(
951 self,
952 filename: str | os.PathLike[str],
953 file: IO[bytes] | _GitFile | None = None,
954 contents: bytes | None = None,
955 size: int | None = None,
956 ) -> None:
957 """Initialize a version 1 pack index.
959 Args:
960 filename: Path to the index file
961 file: Optional file object
962 contents: Optional mmap'd contents
963 size: Optional size of the index
964 """
965 super().__init__(filename, file, contents, size)
966 self.version = 1
967 self._fan_out_table = self._read_fan_out_table(0)
969 def _unpack_entry(self, i: int) -> tuple[bytes, int, None]:
970 (offset, name) = unpack_from(">L20s", self._contents, (0x100 * 4) + (i * 24))
971 return (name, offset, None)
973 def _unpack_name(self, i: int) -> bytes:
974 offset = (0x100 * 4) + (i * 24) + 4
975 return self._contents[offset : offset + 20]
977 def _unpack_offset(self, i: int) -> int:
978 offset = (0x100 * 4) + (i * 24)
979 result = unpack_from(">L", self._contents, offset)[0]
980 assert isinstance(result, int)
981 return result
983 def _unpack_crc32_checksum(self, i: int) -> None:
984 # Not stored in v1 index files
985 return None
988class PackIndex2(FilePackIndex):
989 """Version 2 Pack Index file."""
991 def __init__(
992 self,
993 filename: str | os.PathLike[str],
994 file: IO[bytes] | _GitFile | None = None,
995 contents: bytes | None = None,
996 size: int | None = None,
997 ) -> None:
998 """Initialize a version 2 pack index.
1000 Args:
1001 filename: Path to the index file
1002 file: Optional file object
1003 contents: Optional mmap'd contents
1004 size: Optional size of the index
1005 """
1006 super().__init__(filename, file, contents, size)
1007 if self._contents[:4] != b"\377tOc":
1008 raise AssertionError("Not a v2 pack index file")
1009 (self.version,) = unpack_from(b">L", self._contents, 4)
1010 if self.version != 2:
1011 raise AssertionError(f"Version was {self.version}")
1012 self._fan_out_table = self._read_fan_out_table(8)
1013 self._name_table_offset = 8 + 0x100 * 4
1014 self._crc32_table_offset = self._name_table_offset + 20 * len(self)
1015 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self)
1016 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len(
1017 self
1018 )
1020 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]:
1021 return (
1022 self._unpack_name(i),
1023 self._unpack_offset(i),
1024 self._unpack_crc32_checksum(i),
1025 )
1027 def _unpack_name(self, i: int) -> bytes:
1028 offset = self._name_table_offset + i * 20
1029 return self._contents[offset : offset + 20]
1031 def _unpack_offset(self, i: int) -> int:
1032 offset_pos = self._pack_offset_table_offset + i * 4
1033 offset = unpack_from(">L", self._contents, offset_pos)[0]
1034 assert isinstance(offset, int)
1035 if offset & (2**31):
1036 large_offset_pos = (
1037 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8
1038 )
1039 offset = unpack_from(">Q", self._contents, large_offset_pos)[0]
1040 assert isinstance(offset, int)
1041 return offset
1043 def _unpack_crc32_checksum(self, i: int) -> int:
1044 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
1045 assert isinstance(result, int)
1046 return result
1049class PackIndex3(FilePackIndex):
1050 """Version 3 Pack Index file.
1052 Supports variable hash sizes for SHA-1 (20 bytes) and SHA-256 (32 bytes).
1053 """
1055 def __init__(
1056 self,
1057 filename: str | os.PathLike[str],
1058 file: IO[bytes] | _GitFile | None = None,
1059 contents: bytes | None = None,
1060 size: int | None = None,
1061 ) -> None:
1062 """Initialize a version 3 pack index.
1064 Args:
1065 filename: Path to the index file
1066 file: Optional file object
1067 contents: Optional mmap'd contents
1068 size: Optional size of the index
1069 """
1070 super().__init__(filename, file, contents, size)
1071 if self._contents[:4] != b"\377tOc":
1072 raise AssertionError("Not a v3 pack index file")
1073 (self.version,) = unpack_from(b">L", self._contents, 4)
1074 if self.version != 3:
1075 raise AssertionError(f"Version was {self.version}")
1077 # Read hash algorithm identifier (1 = SHA-1, 2 = SHA-256)
1078 (self.hash_algorithm,) = unpack_from(b">L", self._contents, 8)
1079 if self.hash_algorithm == 1:
1080 self.hash_size = 20 # SHA-1
1081 elif self.hash_algorithm == 2:
1082 self.hash_size = 32 # SHA-256
1083 else:
1084 raise AssertionError(f"Unknown hash algorithm {self.hash_algorithm}")
1086 # Read length of shortened object names
1087 (self.shortened_oid_len,) = unpack_from(b">L", self._contents, 12)
1089 # Calculate offsets based on variable hash size
1090 self._fan_out_table = self._read_fan_out_table(
1091 16
1092 ) # After header (4 + 4 + 4 + 4)
1093 self._name_table_offset = 16 + 0x100 * 4
1094 self._crc32_table_offset = self._name_table_offset + self.hash_size * len(self)
1095 self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self)
1096 self._pack_offset_largetable_offset = self._pack_offset_table_offset + 4 * len(
1097 self
1098 )
1100 def _unpack_entry(self, i: int) -> tuple[bytes, int, int]:
1101 return (
1102 self._unpack_name(i),
1103 self._unpack_offset(i),
1104 self._unpack_crc32_checksum(i),
1105 )
1107 def _unpack_name(self, i: int) -> bytes:
1108 offset = self._name_table_offset + i * self.hash_size
1109 return self._contents[offset : offset + self.hash_size]
1111 def _unpack_offset(self, i: int) -> int:
1112 offset_pos = self._pack_offset_table_offset + i * 4
1113 offset = unpack_from(">L", self._contents, offset_pos)[0]
1114 assert isinstance(offset, int)
1115 if offset & (2**31):
1116 large_offset_pos = (
1117 self._pack_offset_largetable_offset + (offset & (2**31 - 1)) * 8
1118 )
1119 offset = unpack_from(">Q", self._contents, large_offset_pos)[0]
1120 assert isinstance(offset, int)
1121 return offset
1123 def _unpack_crc32_checksum(self, i: int) -> int:
1124 result = unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
1125 assert isinstance(result, int)
1126 return result
1129def read_pack_header(read: Callable[[int], bytes]) -> tuple[int, int]:
1130 """Read the header of a pack file.
1132 Args:
1133 read: Read function
1134 Returns: Tuple of (pack version, number of objects). If no data is
1135 available to read, returns (None, None).
1136 """
1137 header = read(12)
1138 if not header:
1139 raise AssertionError("file too short to contain pack")
1140 if header[:4] != b"PACK":
1141 raise AssertionError(f"Invalid pack header {header!r}")
1142 (version,) = unpack_from(b">L", header, 4)
1143 if version not in (2, 3):
1144 raise AssertionError(f"Version was {version}")
1145 (num_objects,) = unpack_from(b">L", header, 8)
1146 return (version, num_objects)
1149def chunks_length(chunks: bytes | Iterable[bytes]) -> int:
1150 """Get the total length of a sequence of chunks.
1152 Args:
1153 chunks: Either a single bytes object or an iterable of bytes
1154 Returns: Total length in bytes
1155 """
1156 if isinstance(chunks, bytes):
1157 return len(chunks)
1158 else:
1159 return sum(map(len, chunks))
1162def unpack_object(
1163 read_all: Callable[[int], bytes],
1164 read_some: Callable[[int], bytes] | None = None,
1165 compute_crc32: bool = False,
1166 include_comp: bool = False,
1167 zlib_bufsize: int = _ZLIB_BUFSIZE,
1168) -> tuple[UnpackedObject, bytes]:
1169 """Unpack a Git object.
1171 Args:
1172 read_all: Read function that blocks until the number of requested
1173 bytes are read.
1174 read_some: Read function that returns at least one byte, but may not
1175 return the number of bytes requested.
1176 compute_crc32: If True, compute the CRC32 of the compressed data. If
1177 False, the returned CRC32 will be None.
1178 include_comp: If True, include compressed data in the result.
1179 zlib_bufsize: An optional buffer size for zlib operations.
1180 Returns: A tuple of (unpacked, unused), where unused is the unused data
1181 leftover from decompression, and unpacked in an UnpackedObject with
1182 the following attrs set:
1184 * obj_chunks (for non-delta types)
1185 * pack_type_num
1186 * delta_base (for delta types)
1187 * comp_chunks (if include_comp is True)
1188 * decomp_chunks
1189 * decomp_len
1190 * crc32 (if compute_crc32 is True)
1191 """
1192 if read_some is None:
1193 read_some = read_all
1194 if compute_crc32:
1195 crc32 = 0
1196 else:
1197 crc32 = None
1199 raw, crc32 = take_msb_bytes(read_all, crc32=crc32)
1200 type_num = (raw[0] >> 4) & 0x07
1201 size = raw[0] & 0x0F
1202 for i, byte in enumerate(raw[1:]):
1203 size += (byte & 0x7F) << ((i * 7) + 4)
1205 delta_base: int | bytes | None
1206 raw_base = len(raw)
1207 if type_num == OFS_DELTA:
1208 raw, crc32 = take_msb_bytes(read_all, crc32=crc32)
1209 raw_base += len(raw)
1210 if raw[-1] & 0x80:
1211 raise AssertionError
1212 delta_base_offset = raw[0] & 0x7F
1213 for byte in raw[1:]:
1214 delta_base_offset += 1
1215 delta_base_offset <<= 7
1216 delta_base_offset += byte & 0x7F
1217 delta_base = delta_base_offset
1218 elif type_num == REF_DELTA:
1219 delta_base_obj = read_all(20)
1220 if crc32 is not None:
1221 crc32 = binascii.crc32(delta_base_obj, crc32)
1222 delta_base = delta_base_obj
1223 raw_base += 20
1224 else:
1225 delta_base = None
1227 unpacked = UnpackedObject(
1228 type_num, delta_base=delta_base, decomp_len=size, crc32=crc32
1229 )
1230 unused = read_zlib_chunks(
1231 read_some,
1232 unpacked,
1233 buffer_size=zlib_bufsize,
1234 include_comp=include_comp,
1235 )
1236 return unpacked, unused
1239def _compute_object_size(value: tuple[int, Any]) -> int:
1240 """Compute the size of a unresolved object for use with LRUSizeCache."""
1241 (num, obj) = value
1242 if num in DELTA_TYPES:
1243 return chunks_length(obj[1])
1244 return chunks_length(obj)
1247class PackStreamReader:
1248 """Class to read a pack stream.
1250 The pack is read from a ReceivableProtocol using read() or recv() as
1251 appropriate.
1252 """
1254 def __init__(
1255 self,
1256 read_all: Callable[[int], bytes],
1257 read_some: Callable[[int], bytes] | None = None,
1258 zlib_bufsize: int = _ZLIB_BUFSIZE,
1259 ) -> None:
1260 """Initialize pack stream reader.
1262 Args:
1263 read_all: Function to read all requested bytes
1264 read_some: Function to read some bytes (optional)
1265 zlib_bufsize: Buffer size for zlib decompression
1266 """
1267 self.read_all = read_all
1268 if read_some is None:
1269 self.read_some = read_all
1270 else:
1271 self.read_some = read_some
1272 self.sha = sha1()
1273 self._offset = 0
1274 self._rbuf = BytesIO()
1275 # trailer is a deque to avoid memory allocation on small reads
1276 self._trailer: deque[int] = deque()
1277 self._zlib_bufsize = zlib_bufsize
1279 def _read(self, read: Callable[[int], bytes], size: int) -> bytes:
1280 """Read up to size bytes using the given callback.
1282 As a side effect, update the verifier's hash (excluding the last 20
1283 bytes read).
1285 Args:
1286 read: The read callback to read from.
1287 size: The maximum number of bytes to read; the particular
1288 behavior is callback-specific.
1289 Returns: Bytes read
1290 """
1291 data = read(size)
1293 # maintain a trailer of the last 20 bytes we've read
1294 n = len(data)
1295 self._offset += n
1296 tn = len(self._trailer)
1297 if n >= 20:
1298 to_pop = tn
1299 to_add = 20
1300 else:
1301 to_pop = max(n + tn - 20, 0)
1302 to_add = n
1303 self.sha.update(
1304 bytes(bytearray([self._trailer.popleft() for _ in range(to_pop)]))
1305 )
1306 self._trailer.extend(data[-to_add:])
1308 # hash everything but the trailer
1309 self.sha.update(data[:-to_add])
1310 return data
1312 def _buf_len(self) -> int:
1313 buf = self._rbuf
1314 start = buf.tell()
1315 buf.seek(0, SEEK_END)
1316 end = buf.tell()
1317 buf.seek(start)
1318 return end - start
1320 @property
1321 def offset(self) -> int:
1322 """Return current offset in the stream."""
1323 return self._offset - self._buf_len()
1325 def read(self, size: int) -> bytes:
1326 """Read, blocking until size bytes are read."""
1327 buf_len = self._buf_len()
1328 if buf_len >= size:
1329 return self._rbuf.read(size)
1330 buf_data = self._rbuf.read()
1331 self._rbuf = BytesIO()
1332 return buf_data + self._read(self.read_all, size - buf_len)
1334 def recv(self, size: int) -> bytes:
1335 """Read up to size bytes, blocking until one byte is read."""
1336 buf_len = self._buf_len()
1337 if buf_len:
1338 data = self._rbuf.read(size)
1339 if size >= buf_len:
1340 self._rbuf = BytesIO()
1341 return data
1342 return self._read(self.read_some, size)
1344 def __len__(self) -> int:
1345 """Return the number of objects in this pack."""
1346 return self._num_objects
1348 def read_objects(self, compute_crc32: bool = False) -> Iterator[UnpackedObject]:
1349 """Read the objects in this pack file.
1351 Args:
1352 compute_crc32: If True, compute the CRC32 of the compressed
1353 data. If False, the returned CRC32 will be None.
1354 Returns: Iterator over UnpackedObjects with the following members set:
1355 offset
1356 obj_type_num
1357 obj_chunks (for non-delta types)
1358 delta_base (for delta types)
1359 decomp_chunks
1360 decomp_len
1361 crc32 (if compute_crc32 is True)
1363 Raises:
1364 ChecksumMismatch: if the checksum of the pack contents does not
1365 match the checksum in the pack trailer.
1366 zlib.error: if an error occurred during zlib decompression.
1367 IOError: if an error occurred writing to the output file.
1368 """
1369 _pack_version, self._num_objects = read_pack_header(self.read)
1371 for _ in range(self._num_objects):
1372 offset = self.offset
1373 unpacked, unused = unpack_object(
1374 self.read,
1375 read_some=self.recv,
1376 compute_crc32=compute_crc32,
1377 zlib_bufsize=self._zlib_bufsize,
1378 )
1379 unpacked.offset = offset
1381 # prepend any unused data to current read buffer
1382 buf = BytesIO()
1383 buf.write(unused)
1384 buf.write(self._rbuf.read())
1385 buf.seek(0)
1386 self._rbuf = buf
1388 yield unpacked
1390 if self._buf_len() < 20:
1391 # If the read buffer is full, then the last read() got the whole
1392 # trailer off the wire. If not, it means there is still some of the
1393 # trailer to read. We need to read() all 20 bytes; N come from the
1394 # read buffer and (20 - N) come from the wire.
1395 self.read(20)
1397 pack_sha = bytearray(self._trailer)
1398 if pack_sha != self.sha.digest():
1399 raise ChecksumMismatch(sha_to_hex(bytes(pack_sha)), self.sha.hexdigest())
1402class PackStreamCopier(PackStreamReader):
1403 """Class to verify a pack stream as it is being read.
1405 The pack is read from a ReceivableProtocol using read() or recv() as
1406 appropriate and written out to the given file-like object.
1407 """
1409 def __init__(
1410 self,
1411 read_all: Callable[[int], bytes],
1412 read_some: Callable[[int], bytes] | None,
1413 outfile: IO[bytes],
1414 delta_iter: Optional["DeltaChainIterator[UnpackedObject]"] = None,
1415 ) -> None:
1416 """Initialize the copier.
1418 Args:
1419 read_all: Read function that blocks until the number of
1420 requested bytes are read.
1421 read_some: Read function that returns at least one byte, but may
1422 not return the number of bytes requested.
1423 outfile: File-like object to write output through.
1424 delta_iter: Optional DeltaChainIterator to record deltas as we
1425 read them.
1426 """
1427 super().__init__(read_all, read_some=read_some)
1428 self.outfile = outfile
1429 self._delta_iter = delta_iter
1431 def _read(self, read: Callable[[int], bytes], size: int) -> bytes:
1432 """Read data from the read callback and write it to the file."""
1433 data = super()._read(read, size)
1434 self.outfile.write(data)
1435 return data
1437 def verify(self, progress: Callable[..., None] | None = None) -> None:
1438 """Verify a pack stream and write it to the output file.
1440 See PackStreamReader.iterobjects for a list of exceptions this may
1441 throw.
1442 """
1443 i = 0 # default count of entries if read_objects() is empty
1444 for i, unpacked in enumerate(self.read_objects()):
1445 if self._delta_iter:
1446 self._delta_iter.record(unpacked)
1447 if progress is not None:
1448 progress(f"copying pack entries: {i}/{len(self)}\r".encode("ascii"))
1449 if progress is not None:
1450 progress(f"copied {i} pack entries\n".encode("ascii"))
1453def obj_sha(type: int, chunks: bytes | Iterable[bytes]) -> bytes:
1454 """Compute the SHA for a numeric type and object chunks."""
1455 sha = sha1()
1456 sha.update(object_header(type, chunks_length(chunks)))
1457 if isinstance(chunks, bytes):
1458 sha.update(chunks)
1459 else:
1460 for chunk in chunks:
1461 sha.update(chunk)
1462 return sha.digest()
1465def compute_file_sha(
1466 f: IO[bytes], start_ofs: int = 0, end_ofs: int = 0, buffer_size: int = 1 << 16
1467) -> "HashObject":
1468 """Hash a portion of a file into a new SHA.
1470 Args:
1471 f: A file-like object to read from that supports seek().
1472 start_ofs: The offset in the file to start reading at.
1473 end_ofs: The offset in the file to end reading at, relative to the
1474 end of the file.
1475 buffer_size: A buffer size for reading.
1476 Returns: A new SHA object updated with data read from the file.
1477 """
1478 sha = sha1()
1479 f.seek(0, SEEK_END)
1480 length = f.tell()
1481 if (end_ofs < 0 and length + end_ofs < start_ofs) or end_ofs > length:
1482 raise AssertionError(
1483 f"Attempt to read beyond file length. start_ofs: {start_ofs}, end_ofs: {end_ofs}, file length: {length}"
1484 )
1485 todo = length + end_ofs - start_ofs
1486 f.seek(start_ofs)
1487 while todo:
1488 data = f.read(min(todo, buffer_size))
1489 sha.update(data)
1490 todo -= len(data)
1491 return sha
1494class PackData:
1495 """The data contained in a packfile.
1497 Pack files can be accessed both sequentially for exploding a pack, and
1498 directly with the help of an index to retrieve a specific object.
1500 The objects within are either complete or a delta against another.
1502 The header is variable length. If the MSB of each byte is set then it
1503 indicates that the subsequent byte is still part of the header.
1504 For the first byte the next MS bits are the type, which tells you the type
1505 of object, and whether it is a delta. The LS byte is the lowest bits of the
1506 size. For each subsequent byte the LS 7 bits are the next MS bits of the
1507 size, i.e. the last byte of the header contains the MS bits of the size.
1509 For the complete objects the data is stored as zlib deflated data.
1510 The size in the header is the uncompressed object size, so to uncompress
1511 you need to just keep feeding data to zlib until you get an object back,
1512 or it errors on bad data. This is done here by just giving the complete
1513 buffer from the start of the deflated object on. This is bad, but until I
1514 get mmap sorted out it will have to do.
1516 Currently there are no integrity checks done. Also no attempt is made to
1517 try and detect the delta case, or a request for an object at the wrong
1518 position. It will all just throw a zlib or KeyError.
1519 """
1521 def __init__(
1522 self,
1523 filename: str | os.PathLike[str],
1524 file: IO[bytes] | None = None,
1525 size: int | None = None,
1526 *,
1527 delta_window_size: int | None = None,
1528 window_memory: int | None = None,
1529 delta_cache_size: int | None = None,
1530 depth: int | None = None,
1531 threads: int | None = None,
1532 big_file_threshold: int | None = None,
1533 ) -> None:
1534 """Create a PackData object representing the pack in the given filename.
1536 The file must exist and stay readable until the object is disposed of.
1537 It must also stay the same size. It will be mapped whenever needed.
1539 Currently there is a restriction on the size of the pack as the python
1540 mmap implementation is flawed.
1541 """
1542 self._filename = filename
1543 self._size = size
1544 self._header_size = 12
1545 self.delta_window_size = delta_window_size
1546 self.window_memory = window_memory
1547 self.delta_cache_size = delta_cache_size
1548 self.depth = depth
1549 self.threads = threads
1550 self.big_file_threshold = big_file_threshold
1551 self._file: IO[bytes]
1553 if file is None:
1554 self._file = GitFile(self._filename, "rb")
1555 else:
1556 self._file = file
1557 (_version, self._num_objects) = read_pack_header(self._file.read)
1559 # Use delta_cache_size config if available, otherwise default
1560 cache_size = delta_cache_size or (1024 * 1024 * 20)
1561 self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]](
1562 cache_size, compute_size=_compute_object_size
1563 )
1565 @property
1566 def filename(self) -> str:
1567 """Get the filename of the pack file.
1569 Returns:
1570 Base filename without directory path
1571 """
1572 return os.path.basename(self._filename)
1574 @property
1575 def path(self) -> str | os.PathLike[str]:
1576 """Get the full path of the pack file.
1578 Returns:
1579 Full path to the pack file
1580 """
1581 return self._filename
1583 @classmethod
1584 def from_file(cls, file: IO[bytes], size: int | None = None) -> "PackData":
1585 """Create a PackData object from an open file.
1587 Args:
1588 file: Open file object
1589 size: Optional file size
1591 Returns:
1592 PackData instance
1593 """
1594 return cls(str(file), file=file, size=size)
1596 @classmethod
1597 def from_path(cls, path: str | os.PathLike[str]) -> "PackData":
1598 """Create a PackData object from a file path.
1600 Args:
1601 path: Path to the pack file
1603 Returns:
1604 PackData instance
1605 """
1606 return cls(filename=path)
1608 def close(self) -> None:
1609 """Close the underlying pack file."""
1610 self._file.close()
1612 def __enter__(self) -> "PackData":
1613 """Enter context manager."""
1614 return self
1616 def __exit__(
1617 self,
1618 exc_type: type | None,
1619 exc_val: BaseException | None,
1620 exc_tb: TracebackType | None,
1621 ) -> None:
1622 """Exit context manager."""
1623 self.close()
1625 def __eq__(self, other: object) -> bool:
1626 """Check equality with another object."""
1627 if isinstance(other, PackData):
1628 return self.get_stored_checksum() == other.get_stored_checksum()
1629 return False
1631 def _get_size(self) -> int:
1632 if self._size is not None:
1633 return self._size
1634 self._size = os.path.getsize(self._filename)
1635 if self._size < self._header_size:
1636 errmsg = f"{self._filename} is too small for a packfile ({self._size} < {self._header_size})"
1637 raise AssertionError(errmsg)
1638 return self._size
1640 def __len__(self) -> int:
1641 """Returns the number of objects in this pack."""
1642 return self._num_objects
1644 def calculate_checksum(self) -> bytes:
1645 """Calculate the checksum for this pack.
1647 Returns: 20-byte binary SHA1 digest
1648 """
1649 return compute_file_sha(self._file, end_ofs=-20).digest()
1651 def iter_unpacked(self, *, include_comp: bool = False) -> Iterator[UnpackedObject]:
1652 """Iterate over unpacked objects in the pack."""
1653 self._file.seek(self._header_size)
1655 if self._num_objects is None:
1656 return
1658 for _ in range(self._num_objects):
1659 offset = self._file.tell()
1660 unpacked, unused = unpack_object(
1661 self._file.read, compute_crc32=False, include_comp=include_comp
1662 )
1663 unpacked.offset = offset
1664 yield unpacked
1665 # Back up over unused data.
1666 self._file.seek(-len(unused), SEEK_CUR)
1668 def iterentries(
1669 self,
1670 progress: Callable[[int, int], None] | None = None,
1671 resolve_ext_ref: ResolveExtRefFn | None = None,
1672 ) -> Iterator[tuple[bytes, int, int | None]]:
1673 """Yield entries summarizing the contents of this pack.
1675 Args:
1676 progress: Progress function, called with current and total
1677 object count.
1678 resolve_ext_ref: Optional function to resolve external references
1679 Returns: iterator of tuples with (sha, offset, crc32)
1680 """
1681 num_objects = self._num_objects
1682 indexer = PackIndexer.for_pack_data(self, resolve_ext_ref=resolve_ext_ref)
1683 for i, result in enumerate(indexer):
1684 if progress is not None:
1685 progress(i, num_objects)
1686 yield result
1688 def sorted_entries(
1689 self,
1690 progress: ProgressFn | None = None,
1691 resolve_ext_ref: ResolveExtRefFn | None = None,
1692 ) -> list[tuple[bytes, int, int]]:
1693 """Return entries in this pack, sorted by SHA.
1695 Args:
1696 progress: Progress function, called with current and total
1697 object count
1698 resolve_ext_ref: Optional function to resolve external references
1699 Returns: Iterator of tuples with (sha, offset, crc32)
1700 """
1701 return sorted(
1702 self.iterentries(progress=progress, resolve_ext_ref=resolve_ext_ref) # type: ignore
1703 )
1705 def create_index_v1(
1706 self,
1707 filename: str,
1708 progress: Callable[..., None] | None = None,
1709 resolve_ext_ref: ResolveExtRefFn | None = None,
1710 ) -> bytes:
1711 """Create a version 1 file for this data file.
1713 Args:
1714 filename: Index filename.
1715 progress: Progress report function
1716 resolve_ext_ref: Optional function to resolve external references
1717 Returns: Checksum of index file
1718 """
1719 entries = self.sorted_entries(
1720 progress=progress, resolve_ext_ref=resolve_ext_ref
1721 )
1722 checksum = self.calculate_checksum()
1723 with GitFile(filename, "wb") as f:
1724 write_pack_index_v1(
1725 f,
1726 entries,
1727 checksum,
1728 )
1729 return checksum
1731 def create_index_v2(
1732 self,
1733 filename: str,
1734 progress: Callable[..., None] | None = None,
1735 resolve_ext_ref: ResolveExtRefFn | None = None,
1736 ) -> bytes:
1737 """Create a version 2 index file for this data file.
1739 Args:
1740 filename: Index filename.
1741 progress: Progress report function
1742 resolve_ext_ref: Optional function to resolve external references
1743 Returns: Checksum of index file
1744 """
1745 entries = self.sorted_entries(
1746 progress=progress, resolve_ext_ref=resolve_ext_ref
1747 )
1748 with GitFile(filename, "wb") as f:
1749 return write_pack_index_v2(f, entries, self.calculate_checksum())
1751 def create_index_v3(
1752 self,
1753 filename: str,
1754 progress: Callable[..., None] | None = None,
1755 resolve_ext_ref: ResolveExtRefFn | None = None,
1756 hash_algorithm: int = 1,
1757 ) -> bytes:
1758 """Create a version 3 index file for this data file.
1760 Args:
1761 filename: Index filename.
1762 progress: Progress report function
1763 resolve_ext_ref: Function to resolve external references
1764 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256)
1765 Returns: Checksum of index file
1766 """
1767 entries = self.sorted_entries(
1768 progress=progress, resolve_ext_ref=resolve_ext_ref
1769 )
1770 with GitFile(filename, "wb") as f:
1771 return write_pack_index_v3(
1772 f, entries, self.calculate_checksum(), hash_algorithm
1773 )
1775 def create_index(
1776 self,
1777 filename: str,
1778 progress: Callable[..., None] | None = None,
1779 version: int = 2,
1780 resolve_ext_ref: ResolveExtRefFn | None = None,
1781 hash_algorithm: int = 1,
1782 ) -> bytes:
1783 """Create an index file for this data file.
1785 Args:
1786 filename: Index filename.
1787 progress: Progress report function
1788 version: Index version (1, 2, or 3)
1789 resolve_ext_ref: Function to resolve external references
1790 hash_algorithm: Hash algorithm identifier for v3 (1 = SHA-1, 2 = SHA-256)
1791 Returns: Checksum of index file
1792 """
1793 if version == 1:
1794 return self.create_index_v1(
1795 filename, progress, resolve_ext_ref=resolve_ext_ref
1796 )
1797 elif version == 2:
1798 return self.create_index_v2(
1799 filename, progress, resolve_ext_ref=resolve_ext_ref
1800 )
1801 elif version == 3:
1802 return self.create_index_v3(
1803 filename,
1804 progress,
1805 resolve_ext_ref=resolve_ext_ref,
1806 hash_algorithm=hash_algorithm,
1807 )
1808 else:
1809 raise ValueError(f"unknown index format {version}")
1811 def get_stored_checksum(self) -> bytes:
1812 """Return the expected checksum stored in this pack."""
1813 self._file.seek(-20, SEEK_END)
1814 return self._file.read(20)
1816 def check(self) -> None:
1817 """Check the consistency of this pack."""
1818 actual = self.calculate_checksum()
1819 stored = self.get_stored_checksum()
1820 if actual != stored:
1821 raise ChecksumMismatch(stored, actual)
1823 def get_unpacked_object_at(
1824 self, offset: int, *, include_comp: bool = False
1825 ) -> UnpackedObject:
1826 """Given offset in the packfile return a UnpackedObject."""
1827 assert offset >= self._header_size
1828 self._file.seek(offset)
1829 unpacked, _ = unpack_object(self._file.read, include_comp=include_comp)
1830 unpacked.offset = offset
1831 return unpacked
1833 def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]:
1834 """Given an offset in to the packfile return the object that is there.
1836 Using the associated index the location of an object can be looked up,
1837 and then the packfile can be asked directly for that object using this
1838 function.
1839 """
1840 try:
1841 return self._offset_cache[offset]
1842 except KeyError:
1843 pass
1844 unpacked = self.get_unpacked_object_at(offset, include_comp=False)
1845 return (unpacked.pack_type_num, unpacked._obj())
1848T = TypeVar("T")
1851class DeltaChainIterator(Generic[T]):
1852 """Abstract iterator over pack data based on delta chains.
1854 Each object in the pack is guaranteed to be inflated exactly once,
1855 regardless of how many objects reference it as a delta base. As a result,
1856 memory usage is proportional to the length of the longest delta chain.
1858 Subclasses can override _result to define the result type of the iterator.
1859 By default, results are UnpackedObjects with the following members set:
1861 * offset
1862 * obj_type_num
1863 * obj_chunks
1864 * pack_type_num
1865 * delta_base (for delta types)
1866 * comp_chunks (if _include_comp is True)
1867 * decomp_chunks
1868 * decomp_len
1869 * crc32 (if _compute_crc32 is True)
1870 """
1872 _compute_crc32 = False
1873 _include_comp = False
1875 def __init__(
1876 self,
1877 file_obj: IO[bytes] | None,
1878 *,
1879 resolve_ext_ref: ResolveExtRefFn | None = None,
1880 ) -> None:
1881 """Initialize DeltaChainIterator.
1883 Args:
1884 file_obj: File object to read pack data from
1885 resolve_ext_ref: Optional function to resolve external references
1886 """
1887 self._file = file_obj
1888 self._resolve_ext_ref = resolve_ext_ref
1889 self._pending_ofs: dict[int, list[int]] = defaultdict(list)
1890 self._pending_ref: dict[bytes, list[int]] = defaultdict(list)
1891 self._full_ofs: list[tuple[int, int]] = []
1892 self._ext_refs: list[bytes] = []
1894 @classmethod
1895 def for_pack_data(
1896 cls, pack_data: PackData, resolve_ext_ref: ResolveExtRefFn | None = None
1897 ) -> "DeltaChainIterator[T]":
1898 """Create a DeltaChainIterator from pack data.
1900 Args:
1901 pack_data: PackData object to iterate
1902 resolve_ext_ref: Optional function to resolve external refs
1904 Returns:
1905 DeltaChainIterator instance
1906 """
1907 walker = cls(None, resolve_ext_ref=resolve_ext_ref)
1908 walker.set_pack_data(pack_data)
1909 for unpacked in pack_data.iter_unpacked(include_comp=False):
1910 walker.record(unpacked)
1911 return walker
1913 @classmethod
1914 def for_pack_subset(
1915 cls,
1916 pack: "Pack",
1917 shas: Iterable[bytes],
1918 *,
1919 allow_missing: bool = False,
1920 resolve_ext_ref: ResolveExtRefFn | None = None,
1921 ) -> "DeltaChainIterator[T]":
1922 """Create a DeltaChainIterator for a subset of objects.
1924 Args:
1925 pack: Pack object containing the data
1926 shas: Iterable of object SHAs to include
1927 allow_missing: If True, skip missing objects
1928 resolve_ext_ref: Optional function to resolve external refs
1930 Returns:
1931 DeltaChainIterator instance
1932 """
1933 walker = cls(None, resolve_ext_ref=resolve_ext_ref)
1934 walker.set_pack_data(pack.data)
1935 todo = set()
1936 for sha in shas:
1937 assert isinstance(sha, bytes)
1938 try:
1939 off = pack.index.object_offset(sha)
1940 except KeyError:
1941 if not allow_missing:
1942 raise
1943 else:
1944 todo.add(off)
1945 done = set()
1946 while todo:
1947 off = todo.pop()
1948 unpacked = pack.data.get_unpacked_object_at(off)
1949 walker.record(unpacked)
1950 done.add(off)
1951 base_ofs = None
1952 if unpacked.pack_type_num == OFS_DELTA:
1953 assert unpacked.offset is not None
1954 assert unpacked.delta_base is not None
1955 assert isinstance(unpacked.delta_base, int)
1956 base_ofs = unpacked.offset - unpacked.delta_base
1957 elif unpacked.pack_type_num == REF_DELTA:
1958 with suppress(KeyError):
1959 assert isinstance(unpacked.delta_base, bytes)
1960 base_ofs = pack.index.object_index(unpacked.delta_base)
1961 if base_ofs is not None and base_ofs not in done:
1962 todo.add(base_ofs)
1963 return walker
1965 def record(self, unpacked: UnpackedObject) -> None:
1966 """Record an unpacked object for later processing.
1968 Args:
1969 unpacked: UnpackedObject to record
1970 """
1971 type_num = unpacked.pack_type_num
1972 offset = unpacked.offset
1973 assert offset is not None
1974 if type_num == OFS_DELTA:
1975 assert unpacked.delta_base is not None
1976 assert isinstance(unpacked.delta_base, int)
1977 base_offset = offset - unpacked.delta_base
1978 self._pending_ofs[base_offset].append(offset)
1979 elif type_num == REF_DELTA:
1980 assert isinstance(unpacked.delta_base, bytes)
1981 self._pending_ref[unpacked.delta_base].append(offset)
1982 else:
1983 self._full_ofs.append((offset, type_num))
1985 def set_pack_data(self, pack_data: PackData) -> None:
1986 """Set the pack data for iteration.
1988 Args:
1989 pack_data: PackData object to use
1990 """
1991 self._file = pack_data._file
1993 def _walk_all_chains(self) -> Iterator[T]:
1994 for offset, type_num in self._full_ofs:
1995 yield from self._follow_chain(offset, type_num, None)
1996 yield from self._walk_ref_chains()
1997 assert not self._pending_ofs, repr(self._pending_ofs)
1999 def _ensure_no_pending(self) -> None:
2000 if self._pending_ref:
2001 raise UnresolvedDeltas([sha_to_hex(s) for s in self._pending_ref])
2003 def _walk_ref_chains(self) -> Iterator[T]:
2004 if not self._resolve_ext_ref:
2005 self._ensure_no_pending()
2006 return
2008 for base_sha, pending in sorted(self._pending_ref.items()):
2009 if base_sha not in self._pending_ref:
2010 continue
2011 try:
2012 type_num, chunks = self._resolve_ext_ref(base_sha)
2013 except KeyError:
2014 # Not an external ref, but may depend on one. Either it will
2015 # get popped via a _follow_chain call, or we will raise an
2016 # error below.
2017 continue
2018 self._ext_refs.append(base_sha)
2019 self._pending_ref.pop(base_sha)
2020 for new_offset in pending:
2021 yield from self._follow_chain(new_offset, type_num, chunks) # type: ignore[arg-type]
2023 self._ensure_no_pending()
2025 def _result(self, unpacked: UnpackedObject) -> T:
2026 raise NotImplementedError
2028 def _resolve_object(
2029 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None
2030 ) -> UnpackedObject:
2031 assert self._file is not None
2032 self._file.seek(offset)
2033 unpacked, _ = unpack_object(
2034 self._file.read,
2035 include_comp=self._include_comp,
2036 compute_crc32=self._compute_crc32,
2037 )
2038 unpacked.offset = offset
2039 if base_chunks is None:
2040 assert unpacked.pack_type_num == obj_type_num
2041 else:
2042 assert unpacked.pack_type_num in DELTA_TYPES
2043 unpacked.obj_type_num = obj_type_num
2044 unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks)
2045 return unpacked
2047 def _follow_chain(
2048 self, offset: int, obj_type_num: int, base_chunks: list[bytes] | None
2049 ) -> Iterator[T]:
2050 # Unlike PackData.get_object_at, there is no need to cache offsets as
2051 # this approach by design inflates each object exactly once.
2052 todo = [(offset, obj_type_num, base_chunks)]
2053 while todo:
2054 (offset, obj_type_num, base_chunks) = todo.pop()
2055 unpacked = self._resolve_object(offset, obj_type_num, base_chunks)
2056 yield self._result(unpacked)
2058 assert unpacked.offset is not None
2059 unblocked = chain(
2060 self._pending_ofs.pop(unpacked.offset, []),
2061 self._pending_ref.pop(unpacked.sha(), []),
2062 )
2063 todo.extend(
2064 (new_offset, unpacked.obj_type_num, unpacked.obj_chunks) # type: ignore
2065 for new_offset in unblocked
2066 )
2068 def __iter__(self) -> Iterator[T]:
2069 """Iterate over objects in the pack."""
2070 return self._walk_all_chains()
2072 def ext_refs(self) -> list[bytes]:
2073 """Return external references."""
2074 return self._ext_refs
2077class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]):
2078 """Delta chain iterator that yield unpacked objects."""
2080 def _result(self, unpacked: UnpackedObject) -> UnpackedObject:
2081 """Return the unpacked object.
2083 Args:
2084 unpacked: The unpacked object
2086 Returns:
2087 The unpacked object unchanged
2088 """
2089 return unpacked
2092class PackIndexer(DeltaChainIterator[PackIndexEntry]):
2093 """Delta chain iterator that yields index entries."""
2095 _compute_crc32 = True
2097 def _result(self, unpacked: UnpackedObject) -> tuple[bytes, int, int | None]:
2098 """Convert unpacked object to pack index entry.
2100 Args:
2101 unpacked: The unpacked object
2103 Returns:
2104 Tuple of (sha, offset, crc32) for index entry
2105 """
2106 assert unpacked.offset is not None
2107 return unpacked.sha(), unpacked.offset, unpacked.crc32
2110class PackInflater(DeltaChainIterator[ShaFile]):
2111 """Delta chain iterator that yields ShaFile objects."""
2113 def _result(self, unpacked: UnpackedObject) -> ShaFile:
2114 """Convert unpacked object to ShaFile.
2116 Args:
2117 unpacked: The unpacked object
2119 Returns:
2120 ShaFile object from the unpacked data
2121 """
2122 return unpacked.sha_file()
2125class SHA1Reader(BinaryIO):
2126 """Wrapper for file-like object that remembers the SHA1 of its data."""
2128 def __init__(self, f: IO[bytes]) -> None:
2129 """Initialize SHA1Reader.
2131 Args:
2132 f: File-like object to wrap
2133 """
2134 self.f = f
2135 self.sha1 = sha1(b"")
2137 def read(self, size: int = -1) -> bytes:
2138 """Read bytes and update SHA1.
2140 Args:
2141 size: Number of bytes to read, -1 for all
2143 Returns:
2144 Bytes read from file
2145 """
2146 data = self.f.read(size)
2147 self.sha1.update(data)
2148 return data
2150 def check_sha(self, allow_empty: bool = False) -> None:
2151 """Check if the SHA1 matches the expected value.
2153 Args:
2154 allow_empty: Allow empty SHA1 hash
2156 Raises:
2157 ChecksumMismatch: If SHA1 doesn't match
2158 """
2159 stored = self.f.read(20)
2160 # If git option index.skipHash is set the index will be empty
2161 if stored != self.sha1.digest() and (
2162 not allow_empty
2163 or sha_to_hex(stored) != b"0000000000000000000000000000000000000000"
2164 ):
2165 raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored))
2167 def close(self) -> None:
2168 """Close the underlying file."""
2169 return self.f.close()
2171 def tell(self) -> int:
2172 """Return current file position."""
2173 return self.f.tell()
2175 # BinaryIO abstract methods
2176 def readable(self) -> bool:
2177 """Check if file is readable."""
2178 return True
2180 def writable(self) -> bool:
2181 """Check if file is writable."""
2182 return False
2184 def seekable(self) -> bool:
2185 """Check if file is seekable."""
2186 return getattr(self.f, "seekable", lambda: False)()
2188 def seek(self, offset: int, whence: int = 0) -> int:
2189 """Seek to position in file.
2191 Args:
2192 offset: Position offset
2193 whence: Reference point (0=start, 1=current, 2=end)
2195 Returns:
2196 New file position
2197 """
2198 return self.f.seek(offset, whence)
2200 def flush(self) -> None:
2201 """Flush the file buffer."""
2202 if hasattr(self.f, "flush"):
2203 self.f.flush()
2205 def readline(self, size: int = -1) -> bytes:
2206 """Read a line from the file.
2208 Args:
2209 size: Maximum bytes to read
2211 Returns:
2212 Line read from file
2213 """
2214 return self.f.readline(size)
2216 def readlines(self, hint: int = -1) -> list[bytes]:
2217 """Read all lines from the file.
2219 Args:
2220 hint: Approximate number of bytes to read
2222 Returns:
2223 List of lines
2224 """
2225 return self.f.readlines(hint)
2227 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override]
2228 """Write multiple lines to the file (not supported)."""
2229 raise UnsupportedOperation("writelines")
2231 def write(self, data: bytes, /) -> int: # type: ignore[override]
2232 """Write data to the file (not supported)."""
2233 raise UnsupportedOperation("write")
2235 def __enter__(self) -> "SHA1Reader":
2236 """Enter context manager."""
2237 return self
2239 def __exit__(
2240 self,
2241 type: type | None,
2242 value: BaseException | None,
2243 traceback: TracebackType | None,
2244 ) -> None:
2245 """Exit context manager and close file."""
2246 self.close()
2248 def __iter__(self) -> "SHA1Reader":
2249 """Return iterator for reading file lines."""
2250 return self
2252 def __next__(self) -> bytes:
2253 """Get next line from file.
2255 Returns:
2256 Next line
2258 Raises:
2259 StopIteration: When no more lines
2260 """
2261 line = self.readline()
2262 if not line:
2263 raise StopIteration
2264 return line
2266 def fileno(self) -> int:
2267 """Return file descriptor number."""
2268 return self.f.fileno()
2270 def isatty(self) -> bool:
2271 """Check if file is a terminal."""
2272 return getattr(self.f, "isatty", lambda: False)()
2274 def truncate(self, size: int | None = None) -> int:
2275 """Not supported for read-only file.
2277 Raises:
2278 UnsupportedOperation: Always raised
2279 """
2280 raise UnsupportedOperation("truncate")
2283class SHA1Writer(BinaryIO):
2284 """Wrapper for file-like object that remembers the SHA1 of its data."""
2286 def __init__(self, f: BinaryIO | IO[bytes]) -> None:
2287 """Initialize SHA1Writer.
2289 Args:
2290 f: File-like object to wrap
2291 """
2292 self.f = f
2293 self.length = 0
2294 self.sha1 = sha1(b"")
2295 self.digest: bytes | None = None
2297 def write(self, data: bytes | bytearray | memoryview, /) -> int: # type: ignore[override]
2298 """Write data and update SHA1.
2300 Args:
2301 data: Data to write
2303 Returns:
2304 Number of bytes written
2305 """
2306 self.sha1.update(data)
2307 written = self.f.write(data)
2308 self.length += written
2309 return written
2311 def write_sha(self) -> bytes:
2312 """Write the SHA1 digest to the file.
2314 Returns:
2315 The SHA1 digest bytes
2316 """
2317 sha = self.sha1.digest()
2318 assert len(sha) == 20
2319 self.f.write(sha)
2320 self.length += len(sha)
2321 return sha
2323 def close(self) -> None:
2324 """Close the pack file and finalize the SHA."""
2325 self.digest = self.write_sha()
2326 self.f.close()
2328 def offset(self) -> int:
2329 """Get the total number of bytes written.
2331 Returns:
2332 Total bytes written
2333 """
2334 return self.length
2336 def tell(self) -> int:
2337 """Return current file position."""
2338 return self.f.tell()
2340 # BinaryIO abstract methods
2341 def readable(self) -> bool:
2342 """Check if file is readable."""
2343 return False
2345 def writable(self) -> bool:
2346 """Check if file is writable."""
2347 return True
2349 def seekable(self) -> bool:
2350 """Check if file is seekable."""
2351 return getattr(self.f, "seekable", lambda: False)()
2353 def seek(self, offset: int, whence: int = 0) -> int:
2354 """Seek to position in file.
2356 Args:
2357 offset: Position offset
2358 whence: Reference point (0=start, 1=current, 2=end)
2360 Returns:
2361 New file position
2362 """
2363 return self.f.seek(offset, whence)
2365 def flush(self) -> None:
2366 """Flush the file buffer."""
2367 if hasattr(self.f, "flush"):
2368 self.f.flush()
2370 def readline(self, size: int = -1) -> bytes:
2371 """Not supported for write-only file.
2373 Raises:
2374 UnsupportedOperation: Always raised
2375 """
2376 raise UnsupportedOperation("readline")
2378 def readlines(self, hint: int = -1) -> list[bytes]:
2379 """Not supported for write-only file.
2381 Raises:
2382 UnsupportedOperation: Always raised
2383 """
2384 raise UnsupportedOperation("readlines")
2386 def writelines(self, lines: Iterable[bytes], /) -> None: # type: ignore[override]
2387 """Write multiple lines to the file.
2389 Args:
2390 lines: Iterable of lines to write
2391 """
2392 for line in lines:
2393 self.write(line)
2395 def read(self, size: int = -1) -> bytes:
2396 """Not supported for write-only file.
2398 Raises:
2399 UnsupportedOperation: Always raised
2400 """
2401 raise UnsupportedOperation("read")
2403 def __enter__(self) -> "SHA1Writer":
2404 """Enter context manager."""
2405 return self
2407 def __exit__(
2408 self,
2409 type: type | None,
2410 value: BaseException | None,
2411 traceback: TracebackType | None,
2412 ) -> None:
2413 """Exit context manager and close file."""
2414 self.close()
2416 def __iter__(self) -> "SHA1Writer":
2417 """Return iterator."""
2418 return self
2420 def __next__(self) -> bytes:
2421 """Not supported for write-only file.
2423 Raises:
2424 UnsupportedOperation: Always raised
2425 """
2426 raise UnsupportedOperation("__next__")
2428 def fileno(self) -> int:
2429 """Return file descriptor number."""
2430 return self.f.fileno()
2432 def isatty(self) -> bool:
2433 """Check if file is a terminal."""
2434 return getattr(self.f, "isatty", lambda: False)()
2436 def truncate(self, size: int | None = None) -> int:
2437 """Not supported for write-only file.
2439 Raises:
2440 UnsupportedOperation: Always raised
2441 """
2442 raise UnsupportedOperation("truncate")
2445def pack_object_header(
2446 type_num: int, delta_base: bytes | int | None, size: int
2447) -> bytearray:
2448 """Create a pack object header for the given object info.
2450 Args:
2451 type_num: Numeric type of the object.
2452 delta_base: Delta base offset or ref, or None for whole objects.
2453 size: Uncompressed object size.
2454 Returns: A header for a packed object.
2455 """
2456 header = []
2457 c = (type_num << 4) | (size & 15)
2458 size >>= 4
2459 while size:
2460 header.append(c | 0x80)
2461 c = size & 0x7F
2462 size >>= 7
2463 header.append(c)
2464 if type_num == OFS_DELTA:
2465 assert isinstance(delta_base, int)
2466 ret = [delta_base & 0x7F]
2467 delta_base >>= 7
2468 while delta_base:
2469 delta_base -= 1
2470 ret.insert(0, 0x80 | (delta_base & 0x7F))
2471 delta_base >>= 7
2472 header.extend(ret)
2473 elif type_num == REF_DELTA:
2474 assert isinstance(delta_base, bytes)
2475 assert len(delta_base) == 20
2476 header += delta_base
2477 return bytearray(header)
2480def pack_object_chunks(
2481 type: int,
2482 object: list[bytes] | tuple[bytes | int, list[bytes]],
2483 compression_level: int = -1,
2484) -> Iterator[bytes]:
2485 """Generate chunks for a pack object.
2487 Args:
2488 type: Numeric type of the object
2489 object: Object to write
2490 compression_level: the zlib compression level
2491 Returns: Chunks
2492 """
2493 if type in DELTA_TYPES:
2494 if isinstance(object, tuple):
2495 delta_base, object = object
2496 else:
2497 raise TypeError("Delta types require a tuple of (delta_base, object)")
2498 else:
2499 delta_base = None
2501 # Convert object to list of bytes chunks
2502 if isinstance(object, bytes):
2503 chunks = [object]
2504 elif isinstance(object, list):
2505 chunks = object
2506 elif isinstance(object, ShaFile):
2507 chunks = object.as_raw_chunks()
2508 else:
2509 # Shouldn't reach here with proper typing
2510 raise TypeError(f"Unexpected object type: {object.__class__.__name__}")
2512 yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks))))
2513 compressor = zlib.compressobj(level=compression_level)
2514 for data in chunks:
2515 yield compressor.compress(data)
2516 yield compressor.flush()
2519def write_pack_object(
2520 write: Callable[[bytes], int],
2521 type: int,
2522 object: list[bytes] | tuple[bytes | int, list[bytes]],
2523 sha: Optional["HashObject"] = None,
2524 compression_level: int = -1,
2525) -> int:
2526 """Write pack object to a file.
2528 Args:
2529 write: Write function to use
2530 type: Numeric type of the object
2531 object: Object to write
2532 sha: Optional SHA-1 hasher to update
2533 compression_level: the zlib compression level
2534 Returns: CRC32 checksum of the written object
2535 """
2536 crc32 = 0
2537 for chunk in pack_object_chunks(type, object, compression_level=compression_level):
2538 write(chunk)
2539 if sha is not None:
2540 sha.update(chunk)
2541 crc32 = binascii.crc32(chunk, crc32)
2542 return crc32 & 0xFFFFFFFF
2545def write_pack(
2546 filename: str,
2547 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]],
2548 *,
2549 deltify: bool | None = None,
2550 delta_window_size: int | None = None,
2551 compression_level: int = -1,
2552) -> tuple[bytes, bytes]:
2553 """Write a new pack data file.
2555 Args:
2556 filename: Path to the new pack file (without .pack extension)
2557 objects: Objects to write to the pack
2558 delta_window_size: Delta window size
2559 deltify: Whether to deltify pack objects
2560 compression_level: the zlib compression level
2561 Returns: Tuple with checksum of pack file and index file
2562 """
2563 with GitFile(filename + ".pack", "wb") as f:
2564 entries, data_sum = write_pack_objects(
2565 f,
2566 objects,
2567 delta_window_size=delta_window_size,
2568 deltify=deltify,
2569 compression_level=compression_level,
2570 )
2571 entries_list = sorted([(k, v[0], v[1]) for (k, v) in entries.items()])
2572 with GitFile(filename + ".idx", "wb") as f:
2573 idx_sha = write_pack_index(f, entries_list, data_sum)
2574 return data_sum, idx_sha
2577def pack_header_chunks(num_objects: int) -> Iterator[bytes]:
2578 """Yield chunks for a pack header."""
2579 yield b"PACK" # Pack header
2580 yield struct.pack(b">L", 2) # Pack version
2581 yield struct.pack(b">L", num_objects) # Number of objects in pack
2584def write_pack_header(
2585 write: Callable[[bytes], int] | IO[bytes], num_objects: int
2586) -> None:
2587 """Write a pack header for the given number of objects."""
2588 write_fn: Callable[[bytes], int]
2589 if hasattr(write, "write"):
2590 write_fn = write.write
2591 warnings.warn(
2592 "write_pack_header() now takes a write rather than file argument",
2593 DeprecationWarning,
2594 stacklevel=2,
2595 )
2596 else:
2597 write_fn = write
2598 for chunk in pack_header_chunks(num_objects):
2599 write_fn(chunk)
2602def find_reusable_deltas(
2603 container: PackedObjectContainer,
2604 object_ids: Set[bytes],
2605 *,
2606 other_haves: Set[bytes] | None = None,
2607 progress: Callable[..., None] | None = None,
2608) -> Iterator[UnpackedObject]:
2609 """Find deltas in a pack that can be reused.
2611 Args:
2612 container: Pack container to search for deltas
2613 object_ids: Set of object IDs to find deltas for
2614 other_haves: Set of other object IDs we have
2615 progress: Optional progress reporting callback
2617 Returns:
2618 Iterator of UnpackedObject entries that can be reused
2619 """
2620 if other_haves is None:
2621 other_haves = set()
2622 reused = 0
2623 for i, unpacked in enumerate(
2624 container.iter_unpacked_subset(
2625 object_ids, allow_missing=True, convert_ofs_delta=True
2626 )
2627 ):
2628 if progress is not None and i % 1000 == 0:
2629 progress(f"checking for reusable deltas: {i}/{len(object_ids)}\r".encode())
2630 if unpacked.pack_type_num == REF_DELTA:
2631 hexsha = sha_to_hex(unpacked.delta_base) # type: ignore
2632 if hexsha in object_ids or hexsha in other_haves:
2633 yield unpacked
2634 reused += 1
2635 if progress is not None:
2636 progress((f"found {reused} deltas to reuse\n").encode())
2639def deltify_pack_objects(
2640 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, bytes | None]],
2641 *,
2642 window_size: int | None = None,
2643 progress: Callable[..., None] | None = None,
2644) -> Iterator[UnpackedObject]:
2645 """Generate deltas for pack objects.
2647 Args:
2648 objects: An iterable of (object, path) tuples to deltify.
2649 window_size: Window size; None for default
2650 progress: Optional progress reporting callback
2651 Returns: Iterator over type_num, object id, delta_base, content
2652 delta_base is None for full text entries
2653 """
2655 def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, bytes | None]]]:
2656 for e in objects:
2657 if isinstance(e, ShaFile):
2658 yield (e, (e.type_num, None))
2659 else:
2660 yield (e[0], (e[0].type_num, e[1]))
2662 sorted_objs = sort_objects_for_delta(objects_with_hints())
2663 yield from deltas_from_sorted_objects(
2664 sorted_objs,
2665 window_size=window_size,
2666 progress=progress,
2667 )
2670def sort_objects_for_delta(
2671 objects: Iterator[ShaFile] | Iterator[tuple[ShaFile, PackHint | None]],
2672) -> Iterator[tuple[ShaFile, bytes | None]]:
2673 """Sort objects for optimal delta compression.
2675 Args:
2676 objects: Iterator of objects or (object, hint) tuples
2678 Returns:
2679 Iterator of sorted (ShaFile, path) tuples
2680 """
2681 magic = []
2682 for entry in objects:
2683 if isinstance(entry, tuple):
2684 obj, hint = entry
2685 if hint is None:
2686 type_num = None
2687 path = None
2688 else:
2689 (type_num, path) = hint
2690 else:
2691 obj = entry
2692 type_num = None
2693 path = None
2694 magic.append((type_num, path, -obj.raw_length(), obj))
2695 # Build a list of objects ordered by the magic Linus heuristic
2696 # This helps us find good objects to diff against us
2697 magic.sort()
2698 return ((x[3], x[1]) for x in magic)
2701def deltas_from_sorted_objects(
2702 objects: Iterator[tuple[ShaFile, bytes | None]],
2703 window_size: int | None = None,
2704 progress: Callable[..., None] | None = None,
2705) -> Iterator[UnpackedObject]:
2706 """Create deltas from sorted objects.
2708 Args:
2709 objects: Iterator of sorted objects to deltify
2710 window_size: Delta window size; None for default
2711 progress: Optional progress reporting callback
2713 Returns:
2714 Iterator of UnpackedObject entries
2715 """
2716 # TODO(jelmer): Use threads
2717 if window_size is None:
2718 window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE
2720 possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque()
2721 for i, (o, path) in enumerate(objects):
2722 if progress is not None and i % 1000 == 0:
2723 progress((f"generating deltas: {i}\r").encode())
2724 raw = o.as_raw_chunks()
2725 winner = raw
2726 winner_len = sum(map(len, winner))
2727 winner_base = None
2728 for base_id, base_type_num, base in possible_bases:
2729 if base_type_num != o.type_num:
2730 continue
2731 delta_len = 0
2732 delta = []
2733 for chunk in create_delta(b"".join(base), b"".join(raw)):
2734 delta_len += len(chunk)
2735 if delta_len >= winner_len:
2736 break
2737 delta.append(chunk)
2738 else:
2739 winner_base = base_id
2740 winner = delta
2741 winner_len = sum(map(len, winner))
2742 yield UnpackedObject(
2743 o.type_num,
2744 sha=o.sha().digest(),
2745 delta_base=winner_base,
2746 decomp_len=winner_len,
2747 decomp_chunks=winner,
2748 )
2749 possible_bases.appendleft((o.sha().digest(), o.type_num, raw))
2750 while len(possible_bases) > window_size:
2751 possible_bases.pop()
2754def pack_objects_to_data(
2755 objects: Sequence[ShaFile]
2756 | Sequence[tuple[ShaFile, bytes | None]]
2757 | Sequence[tuple[ShaFile, PackHint | None]],
2758 *,
2759 deltify: bool | None = None,
2760 delta_window_size: int | None = None,
2761 ofs_delta: bool = True,
2762 progress: Callable[..., None] | None = None,
2763) -> tuple[int, Iterator[UnpackedObject]]:
2764 """Create pack data from objects.
2766 Args:
2767 objects: Pack objects
2768 deltify: Whether to deltify pack objects
2769 delta_window_size: Delta window size
2770 ofs_delta: Whether to use offset deltas
2771 progress: Optional progress reporting callback
2772 Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
2773 """
2774 count = len(objects)
2775 if deltify is None:
2776 # PERFORMANCE/TODO(jelmer): This should be enabled but the python
2777 # implementation is *much* too slow at the moment.
2778 # Maybe consider enabling it just if the rust extension is available?
2779 deltify = False
2780 if deltify:
2781 return (
2782 count,
2783 deltify_pack_objects(
2784 iter(objects), # type: ignore
2785 window_size=delta_window_size,
2786 progress=progress,
2787 ),
2788 )
2789 else:
2791 def iter_without_path() -> Iterator[UnpackedObject]:
2792 for o in objects:
2793 if isinstance(o, tuple):
2794 yield full_unpacked_object(o[0])
2795 else:
2796 yield full_unpacked_object(o)
2798 return (count, iter_without_path())
2801def generate_unpacked_objects(
2802 container: PackedObjectContainer,
2803 object_ids: Sequence[tuple[ObjectID, PackHint | None]],
2804 delta_window_size: int | None = None,
2805 deltify: bool | None = None,
2806 reuse_deltas: bool = True,
2807 ofs_delta: bool = True,
2808 other_haves: set[bytes] | None = None,
2809 progress: Callable[..., None] | None = None,
2810) -> Iterator[UnpackedObject]:
2811 """Create pack data from objects.
2813 Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
2814 """
2815 todo = dict(object_ids)
2816 if reuse_deltas:
2817 for unpack in find_reusable_deltas(
2818 container, set(todo), other_haves=other_haves, progress=progress
2819 ):
2820 del todo[sha_to_hex(unpack.sha())]
2821 yield unpack
2822 if deltify is None:
2823 # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
2824 # slow at the moment.
2825 deltify = False
2826 if deltify:
2827 objects_to_delta = container.iterobjects_subset(
2828 todo.keys(), allow_missing=False
2829 )
2830 sorted_objs = sort_objects_for_delta((o, todo[o.id]) for o in objects_to_delta)
2831 yield from deltas_from_sorted_objects(
2832 sorted_objs,
2833 window_size=delta_window_size,
2834 progress=progress,
2835 )
2836 else:
2837 for oid in todo:
2838 yield full_unpacked_object(container[oid])
2841def full_unpacked_object(o: ShaFile) -> UnpackedObject:
2842 """Create an UnpackedObject from a ShaFile.
2844 Args:
2845 o: ShaFile object to convert
2847 Returns:
2848 UnpackedObject with full object data
2849 """
2850 return UnpackedObject(
2851 o.type_num,
2852 delta_base=None,
2853 crc32=None,
2854 decomp_chunks=o.as_raw_chunks(),
2855 sha=o.sha().digest(),
2856 )
2859def write_pack_from_container(
2860 write: Callable[[bytes], None]
2861 | Callable[[bytes | bytearray | memoryview], int]
2862 | IO[bytes],
2863 container: PackedObjectContainer,
2864 object_ids: Sequence[tuple[ObjectID, PackHint | None]],
2865 delta_window_size: int | None = None,
2866 deltify: bool | None = None,
2867 reuse_deltas: bool = True,
2868 compression_level: int = -1,
2869 other_haves: set[bytes] | None = None,
2870) -> tuple[dict[bytes, tuple[int, int]], bytes]:
2871 """Write a new pack data file.
2873 Args:
2874 write: write function to use
2875 container: PackedObjectContainer
2876 object_ids: Sequence of (object_id, hint) tuples to write
2877 delta_window_size: Sliding window size for searching for deltas;
2878 Set to None for default window size.
2879 deltify: Whether to deltify objects
2880 reuse_deltas: Whether to reuse existing deltas
2881 compression_level: the zlib compression level to use
2882 other_haves: Set of additional object IDs the receiver has
2883 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
2884 """
2885 pack_contents_count = len(object_ids)
2886 pack_contents = generate_unpacked_objects(
2887 container,
2888 object_ids,
2889 delta_window_size=delta_window_size,
2890 deltify=deltify,
2891 reuse_deltas=reuse_deltas,
2892 other_haves=other_haves,
2893 )
2895 return write_pack_data(
2896 write,
2897 pack_contents,
2898 num_records=pack_contents_count,
2899 compression_level=compression_level,
2900 )
2903def write_pack_objects(
2904 write: Callable[[bytes], None] | IO[bytes],
2905 objects: Sequence[ShaFile] | Sequence[tuple[ShaFile, bytes | None]],
2906 *,
2907 delta_window_size: int | None = None,
2908 deltify: bool | None = None,
2909 compression_level: int = -1,
2910) -> tuple[dict[bytes, tuple[int, int]], bytes]:
2911 """Write a new pack data file.
2913 Args:
2914 write: write function to use
2915 objects: Sequence of (object, path) tuples to write
2916 delta_window_size: Sliding window size for searching for deltas;
2917 Set to None for default window size.
2918 deltify: Whether to deltify objects
2919 compression_level: the zlib compression level to use
2920 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
2921 """
2922 pack_contents_count, pack_contents = pack_objects_to_data(objects, deltify=deltify)
2924 return write_pack_data(
2925 write,
2926 pack_contents,
2927 num_records=pack_contents_count,
2928 compression_level=compression_level,
2929 )
2932class PackChunkGenerator:
2933 """Generator for pack data chunks."""
2935 def __init__(
2936 self,
2937 num_records: int | None = None,
2938 records: Iterator[UnpackedObject] | None = None,
2939 progress: Callable[..., None] | None = None,
2940 compression_level: int = -1,
2941 reuse_compressed: bool = True,
2942 ) -> None:
2943 """Initialize PackChunkGenerator.
2945 Args:
2946 num_records: Expected number of records
2947 records: Iterator of pack records
2948 progress: Optional progress callback
2949 compression_level: Compression level (-1 for default)
2950 reuse_compressed: Whether to reuse compressed chunks
2951 """
2952 self.cs = sha1(b"")
2953 self.entries: dict[bytes, tuple[int, int]] = {}
2954 if records is None:
2955 records = iter([]) # Empty iterator if None
2956 self._it = self._pack_data_chunks(
2957 records=records,
2958 num_records=num_records,
2959 progress=progress,
2960 compression_level=compression_level,
2961 reuse_compressed=reuse_compressed,
2962 )
2964 def sha1digest(self) -> bytes:
2965 """Return the SHA1 digest of the pack data."""
2966 return self.cs.digest()
2968 def __iter__(self) -> Iterator[bytes]:
2969 """Iterate over pack data chunks."""
2970 return self._it
2972 def _pack_data_chunks(
2973 self,
2974 records: Iterator[UnpackedObject],
2975 *,
2976 num_records: int | None = None,
2977 progress: Callable[..., None] | None = None,
2978 compression_level: int = -1,
2979 reuse_compressed: bool = True,
2980 ) -> Iterator[bytes]:
2981 """Iterate pack data file chunks.
2983 Args:
2984 records: Iterator over UnpackedObject
2985 num_records: Number of records (defaults to len(records) if not specified)
2986 progress: Function to report progress to
2987 compression_level: the zlib compression level
2988 reuse_compressed: Whether to reuse compressed chunks
2989 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
2990 """
2991 # Write the pack
2992 if num_records is None:
2993 num_records = len(records) # type: ignore
2994 offset = 0
2995 for chunk in pack_header_chunks(num_records):
2996 yield chunk
2997 self.cs.update(chunk)
2998 offset += len(chunk)
2999 actual_num_records = 0
3000 for i, unpacked in enumerate(records):
3001 type_num = unpacked.pack_type_num
3002 if progress is not None and i % 1000 == 0:
3003 progress((f"writing pack data: {i}/{num_records}\r").encode("ascii"))
3004 raw: list[bytes] | tuple[int, list[bytes]] | tuple[bytes, list[bytes]]
3005 if unpacked.delta_base is not None:
3006 assert isinstance(unpacked.delta_base, bytes), (
3007 f"Expected bytes, got {type(unpacked.delta_base)}"
3008 )
3009 try:
3010 base_offset, _base_crc32 = self.entries[unpacked.delta_base]
3011 except KeyError:
3012 type_num = REF_DELTA
3013 assert isinstance(unpacked.delta_base, bytes)
3014 raw = (unpacked.delta_base, unpacked.decomp_chunks)
3015 else:
3016 type_num = OFS_DELTA
3017 raw = (offset - base_offset, unpacked.decomp_chunks)
3018 else:
3019 raw = unpacked.decomp_chunks
3020 chunks: list[bytes] | Iterator[bytes]
3021 if unpacked.comp_chunks is not None and reuse_compressed:
3022 chunks = unpacked.comp_chunks
3023 else:
3024 chunks = pack_object_chunks(
3025 type_num, raw, compression_level=compression_level
3026 )
3027 crc32 = 0
3028 object_size = 0
3029 for chunk in chunks:
3030 yield chunk
3031 crc32 = binascii.crc32(chunk, crc32)
3032 self.cs.update(chunk)
3033 object_size += len(chunk)
3034 actual_num_records += 1
3035 self.entries[unpacked.sha()] = (offset, crc32)
3036 offset += object_size
3037 if actual_num_records != num_records:
3038 raise AssertionError(
3039 f"actual records written differs: {actual_num_records} != {num_records}"
3040 )
3042 yield self.cs.digest()
3045def write_pack_data(
3046 write: Callable[[bytes], None]
3047 | Callable[[bytes | bytearray | memoryview], int]
3048 | IO[bytes],
3049 records: Iterator[UnpackedObject],
3050 *,
3051 num_records: int | None = None,
3052 progress: Callable[..., None] | None = None,
3053 compression_level: int = -1,
3054) -> tuple[dict[bytes, tuple[int, int]], bytes]:
3055 """Write a new pack data file.
3057 Args:
3058 write: Write function to use
3059 num_records: Number of records (defaults to len(records) if None)
3060 records: Iterator over type_num, object_id, delta_base, raw
3061 progress: Function to report progress to
3062 compression_level: the zlib compression level
3063 Returns: Dict mapping id -> (offset, crc32 checksum), pack checksum
3064 """
3065 chunk_generator = PackChunkGenerator(
3066 num_records=num_records,
3067 records=records,
3068 progress=progress,
3069 compression_level=compression_level,
3070 )
3071 for chunk in chunk_generator:
3072 if callable(write):
3073 write(chunk)
3074 else:
3075 write.write(chunk)
3076 return chunk_generator.entries, chunk_generator.sha1digest()
3079def write_pack_index_v1(
3080 f: IO[bytes],
3081 entries: Iterable[tuple[bytes, int, int | None]],
3082 pack_checksum: bytes,
3083) -> bytes:
3084 """Write a new pack index file.
3086 Args:
3087 f: A file-like object to write to
3088 entries: List of tuples with object name (sha), offset_in_pack,
3089 and crc32_checksum.
3090 pack_checksum: Checksum of the pack file.
3091 Returns: The SHA of the written index file
3092 """
3093 f = SHA1Writer(f)
3094 fan_out_table: dict[int, int] = defaultdict(lambda: 0)
3095 for name, _offset, _entry_checksum in entries:
3096 fan_out_table[ord(name[:1])] += 1
3097 # Fan-out table
3098 for i in range(0x100):
3099 f.write(struct.pack(">L", fan_out_table[i]))
3100 fan_out_table[i + 1] += fan_out_table[i]
3101 for name, offset, _entry_checksum in entries:
3102 if not (offset <= 0xFFFFFFFF):
3103 raise TypeError("pack format 1 only supports offsets < 2Gb")
3104 f.write(struct.pack(">L20s", offset, name))
3105 assert len(pack_checksum) == 20
3106 f.write(pack_checksum)
3107 return f.write_sha()
3110def _delta_encode_size(size: int) -> bytes:
3111 ret = bytearray()
3112 c = size & 0x7F
3113 size >>= 7
3114 while size:
3115 ret.append(c | 0x80)
3116 c = size & 0x7F
3117 size >>= 7
3118 ret.append(c)
3119 return bytes(ret)
3122# The length of delta compression copy operations in version 2 packs is limited
3123# to 64K. To copy more, we use several copy operations. Version 3 packs allow
3124# 24-bit lengths in copy operations, but we always make version 2 packs.
3125_MAX_COPY_LEN = 0xFFFF
3128def _encode_copy_operation(start: int, length: int) -> bytes:
3129 scratch = bytearray([0x80])
3130 for i in range(4):
3131 if start & 0xFF << i * 8:
3132 scratch.append((start >> i * 8) & 0xFF)
3133 scratch[0] |= 1 << i
3134 for i in range(2):
3135 if length & 0xFF << i * 8:
3136 scratch.append((length >> i * 8) & 0xFF)
3137 scratch[0] |= 1 << (4 + i)
3138 return bytes(scratch)
3141def _create_delta_py(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]:
3142 """Use python difflib to work out how to transform base_buf to target_buf.
3144 Args:
3145 base_buf: Base buffer
3146 target_buf: Target buffer
3147 """
3148 if isinstance(base_buf, list):
3149 base_buf = b"".join(base_buf)
3150 if isinstance(target_buf, list):
3151 target_buf = b"".join(target_buf)
3152 assert isinstance(base_buf, bytes)
3153 assert isinstance(target_buf, bytes)
3154 # write delta header
3155 yield _delta_encode_size(len(base_buf))
3156 yield _delta_encode_size(len(target_buf))
3157 # write out delta opcodes
3158 seq = SequenceMatcher(isjunk=None, a=base_buf, b=target_buf)
3159 for opcode, i1, i2, j1, j2 in seq.get_opcodes():
3160 # Git patch opcodes don't care about deletes!
3161 # if opcode == 'replace' or opcode == 'delete':
3162 # pass
3163 if opcode == "equal":
3164 # If they are equal, unpacker will use data from base_buf
3165 # Write out an opcode that says what range to use
3166 copy_start = i1
3167 copy_len = i2 - i1
3168 while copy_len > 0:
3169 to_copy = min(copy_len, _MAX_COPY_LEN)
3170 yield _encode_copy_operation(copy_start, to_copy)
3171 copy_start += to_copy
3172 copy_len -= to_copy
3173 if opcode == "replace" or opcode == "insert":
3174 # If we are replacing a range or adding one, then we just
3175 # output it to the stream (prefixed by its size)
3176 s = j2 - j1
3177 o = j1
3178 while s > 127:
3179 yield bytes([127])
3180 yield bytes(memoryview(target_buf)[o : o + 127])
3181 s -= 127
3182 o += 127
3183 yield bytes([s])
3184 yield bytes(memoryview(target_buf)[o : o + s])
3187# Default to pure Python implementation
3188create_delta = _create_delta_py
3191def apply_delta(
3192 src_buf: bytes | list[bytes], delta: bytes | list[bytes]
3193) -> list[bytes]:
3194 """Based on the similar function in git's patch-delta.c.
3196 Args:
3197 src_buf: Source buffer
3198 delta: Delta instructions
3199 """
3200 if not isinstance(src_buf, bytes):
3201 src_buf = b"".join(src_buf)
3202 if not isinstance(delta, bytes):
3203 delta = b"".join(delta)
3204 out = []
3205 index = 0
3206 delta_length = len(delta)
3208 def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]:
3209 size = 0
3210 i = 0
3211 while delta:
3212 cmd = ord(delta[index : index + 1])
3213 index += 1
3214 size |= (cmd & ~0x80) << i
3215 i += 7
3216 if not cmd & 0x80:
3217 break
3218 return size, index
3220 src_size, index = get_delta_header_size(delta, index)
3221 dest_size, index = get_delta_header_size(delta, index)
3222 if src_size != len(src_buf):
3223 raise ApplyDeltaError(
3224 f"Unexpected source buffer size: {src_size} vs {len(src_buf)}"
3225 )
3226 while index < delta_length:
3227 cmd = ord(delta[index : index + 1])
3228 index += 1
3229 if cmd & 0x80:
3230 cp_off = 0
3231 for i in range(4):
3232 if cmd & (1 << i):
3233 x = ord(delta[index : index + 1])
3234 index += 1
3235 cp_off |= x << (i * 8)
3236 cp_size = 0
3237 # Version 3 packs can contain copy sizes larger than 64K.
3238 for i in range(3):
3239 if cmd & (1 << (4 + i)):
3240 x = ord(delta[index : index + 1])
3241 index += 1
3242 cp_size |= x << (i * 8)
3243 if cp_size == 0:
3244 cp_size = 0x10000
3245 if (
3246 cp_off + cp_size < cp_size
3247 or cp_off + cp_size > src_size
3248 or cp_size > dest_size
3249 ):
3250 break
3251 out.append(src_buf[cp_off : cp_off + cp_size])
3252 elif cmd != 0:
3253 out.append(delta[index : index + cmd])
3254 index += cmd
3255 else:
3256 raise ApplyDeltaError("Invalid opcode 0")
3258 if index != delta_length:
3259 raise ApplyDeltaError(f"delta not empty: {delta[index:]!r}")
3261 if dest_size != chunks_length(out):
3262 raise ApplyDeltaError("dest size incorrect")
3264 return out
3267def write_pack_index_v2(
3268 f: IO[bytes],
3269 entries: Iterable[tuple[bytes, int, int | None]],
3270 pack_checksum: bytes,
3271) -> bytes:
3272 """Write a new pack index file.
3274 Args:
3275 f: File-like object to write to
3276 entries: List of tuples with object name (sha), offset_in_pack, and
3277 crc32_checksum.
3278 pack_checksum: Checksum of the pack file.
3279 Returns: The SHA of the index file written
3280 """
3281 f = SHA1Writer(f)
3282 f.write(b"\377tOc") # Magic!
3283 f.write(struct.pack(">L", 2))
3284 fan_out_table: dict[int, int] = defaultdict(lambda: 0)
3285 for name, offset, entry_checksum in entries:
3286 fan_out_table[ord(name[:1])] += 1
3287 # Fan-out table
3288 largetable: list[int] = []
3289 for i in range(0x100):
3290 f.write(struct.pack(b">L", fan_out_table[i]))
3291 fan_out_table[i + 1] += fan_out_table[i]
3292 for name, offset, entry_checksum in entries:
3293 f.write(name)
3294 for name, offset, entry_checksum in entries:
3295 f.write(struct.pack(b">L", entry_checksum))
3296 for name, offset, entry_checksum in entries:
3297 if offset < 2**31:
3298 f.write(struct.pack(b">L", offset))
3299 else:
3300 f.write(struct.pack(b">L", 2**31 + len(largetable)))
3301 largetable.append(offset)
3302 for offset in largetable:
3303 f.write(struct.pack(b">Q", offset))
3304 assert len(pack_checksum) == 20
3305 f.write(pack_checksum)
3306 return f.write_sha()
3309def write_pack_index_v3(
3310 f: IO[bytes],
3311 entries: Iterable[tuple[bytes, int, int | None]],
3312 pack_checksum: bytes,
3313 hash_algorithm: int = 1,
3314) -> bytes:
3315 """Write a new pack index file in v3 format.
3317 Args:
3318 f: File-like object to write to
3319 entries: List of tuples with object name (sha), offset_in_pack, and
3320 crc32_checksum.
3321 pack_checksum: Checksum of the pack file.
3322 hash_algorithm: Hash algorithm identifier (1 = SHA-1, 2 = SHA-256)
3323 Returns: The SHA of the index file written
3324 """
3325 if hash_algorithm == 1:
3326 hash_size = 20 # SHA-1
3327 writer_cls = SHA1Writer
3328 elif hash_algorithm == 2:
3329 hash_size = 32 # SHA-256
3330 # TODO: Add SHA256Writer when SHA-256 support is implemented
3331 raise NotImplementedError("SHA-256 support not yet implemented")
3332 else:
3333 raise ValueError(f"Unknown hash algorithm {hash_algorithm}")
3335 # Convert entries to list to allow multiple iterations
3336 entries_list = list(entries)
3338 # Calculate shortest unambiguous prefix length for object names
3339 # For now, use full hash size (this could be optimized)
3340 shortened_oid_len = hash_size
3342 f = writer_cls(f)
3343 f.write(b"\377tOc") # Magic!
3344 f.write(struct.pack(">L", 3)) # Version 3
3345 f.write(struct.pack(">L", hash_algorithm)) # Hash algorithm
3346 f.write(struct.pack(">L", shortened_oid_len)) # Shortened OID length
3348 fan_out_table: dict[int, int] = defaultdict(lambda: 0)
3349 for name, offset, entry_checksum in entries_list:
3350 if len(name) != hash_size:
3351 raise ValueError(
3352 f"Object name has wrong length: expected {hash_size}, got {len(name)}"
3353 )
3354 fan_out_table[ord(name[:1])] += 1
3356 # Fan-out table
3357 largetable: list[int] = []
3358 for i in range(0x100):
3359 f.write(struct.pack(b">L", fan_out_table[i]))
3360 fan_out_table[i + 1] += fan_out_table[i]
3362 # Object names table
3363 for name, offset, entry_checksum in entries_list:
3364 f.write(name)
3366 # CRC32 checksums table
3367 for name, offset, entry_checksum in entries_list:
3368 f.write(struct.pack(b">L", entry_checksum))
3370 # Offset table
3371 for name, offset, entry_checksum in entries_list:
3372 if offset < 2**31:
3373 f.write(struct.pack(b">L", offset))
3374 else:
3375 f.write(struct.pack(b">L", 2**31 + len(largetable)))
3376 largetable.append(offset)
3378 # Large offset table
3379 for offset in largetable:
3380 f.write(struct.pack(b">Q", offset))
3382 assert len(pack_checksum) == hash_size, (
3383 f"Pack checksum has wrong length: expected {hash_size}, got {len(pack_checksum)}"
3384 )
3385 f.write(pack_checksum)
3386 return f.write_sha()
3389def write_pack_index(
3390 f: IO[bytes],
3391 entries: Iterable[tuple[bytes, int, int | None]],
3392 pack_checksum: bytes,
3393 progress: Callable[..., None] | None = None,
3394 version: int | None = None,
3395) -> bytes:
3396 """Write a pack index file.
3398 Args:
3399 f: File-like object to write to.
3400 entries: List of (checksum, offset, crc32) tuples
3401 pack_checksum: Checksum of the pack file.
3402 progress: Progress function (not currently used)
3403 version: Pack index version to use (1, 2, or 3). If None, defaults to DEFAULT_PACK_INDEX_VERSION.
3405 Returns:
3406 SHA of the written index file
3407 """
3408 if version is None:
3409 version = DEFAULT_PACK_INDEX_VERSION
3411 if version == 1:
3412 return write_pack_index_v1(f, entries, pack_checksum)
3413 elif version == 2:
3414 return write_pack_index_v2(f, entries, pack_checksum)
3415 elif version == 3:
3416 return write_pack_index_v3(f, entries, pack_checksum)
3417 else:
3418 raise ValueError(f"Unsupported pack index version: {version}")
3421class Pack:
3422 """A Git pack object."""
3424 _data_load: Callable[[], PackData] | None
3425 _idx_load: Callable[[], PackIndex] | None
3427 _data: PackData | None
3428 _idx: PackIndex | None
3429 _bitmap: "PackBitmap | None"
3431 def __init__(
3432 self,
3433 basename: str,
3434 resolve_ext_ref: ResolveExtRefFn | None = None,
3435 *,
3436 delta_window_size: int | None = None,
3437 window_memory: int | None = None,
3438 delta_cache_size: int | None = None,
3439 depth: int | None = None,
3440 threads: int | None = None,
3441 big_file_threshold: int | None = None,
3442 ) -> None:
3443 """Initialize a Pack object.
3445 Args:
3446 basename: Base path for pack files (without .pack/.idx extension)
3447 resolve_ext_ref: Optional function to resolve external references
3448 delta_window_size: Size of the delta compression window
3449 window_memory: Memory limit for delta compression window
3450 delta_cache_size: Size of the delta cache
3451 depth: Maximum depth for delta chains
3452 threads: Number of threads to use for operations
3453 big_file_threshold: Size threshold for big file handling
3454 """
3455 self._basename = basename
3456 self._data = None
3457 self._idx = None
3458 self._bitmap = None
3459 self._idx_path = self._basename + ".idx"
3460 self._data_path = self._basename + ".pack"
3461 self._bitmap_path = self._basename + ".bitmap"
3462 self.delta_window_size = delta_window_size
3463 self.window_memory = window_memory
3464 self.delta_cache_size = delta_cache_size
3465 self.depth = depth
3466 self.threads = threads
3467 self.big_file_threshold = big_file_threshold
3468 self._data_load = lambda: PackData(
3469 self._data_path,
3470 delta_window_size=delta_window_size,
3471 window_memory=window_memory,
3472 delta_cache_size=delta_cache_size,
3473 depth=depth,
3474 threads=threads,
3475 big_file_threshold=big_file_threshold,
3476 )
3477 self._idx_load = lambda: load_pack_index(self._idx_path)
3478 self.resolve_ext_ref = resolve_ext_ref
3480 @classmethod
3481 def from_lazy_objects(
3482 cls, data_fn: Callable[[], PackData], idx_fn: Callable[[], PackIndex]
3483 ) -> "Pack":
3484 """Create a new pack object from callables to load pack data and index objects."""
3485 ret = cls("")
3486 ret._data_load = data_fn
3487 ret._idx_load = idx_fn
3488 return ret
3490 @classmethod
3491 def from_objects(cls, data: PackData, idx: PackIndex) -> "Pack":
3492 """Create a new pack object from pack data and index objects."""
3493 ret = cls("")
3494 ret._data = data
3495 ret._data_load = None
3496 ret._idx = idx
3497 ret._idx_load = None
3498 ret.check_length_and_checksum()
3499 return ret
3501 def name(self) -> bytes:
3502 """The SHA over the SHAs of the objects in this pack."""
3503 return self.index.objects_sha1()
3505 @property
3506 def data(self) -> PackData:
3507 """The pack data object being used."""
3508 if self._data is None:
3509 assert self._data_load
3510 self._data = self._data_load()
3511 self.check_length_and_checksum()
3512 return self._data
3514 @property
3515 def index(self) -> PackIndex:
3516 """The index being used.
3518 Note: This may be an in-memory index
3519 """
3520 if self._idx is None:
3521 assert self._idx_load
3522 self._idx = self._idx_load()
3523 return self._idx
3525 @property
3526 def bitmap(self) -> Optional["PackBitmap"]:
3527 """The bitmap being used, if available.
3529 Returns:
3530 PackBitmap instance or None if no bitmap exists
3532 Raises:
3533 ValueError: If bitmap file is invalid or corrupt
3534 """
3535 if self._bitmap is None:
3536 from .bitmap import read_bitmap
3538 self._bitmap = read_bitmap(self._bitmap_path, pack_index=self.index)
3539 return self._bitmap
3541 def close(self) -> None:
3542 """Close the pack file and index."""
3543 if self._data is not None:
3544 self._data.close()
3545 if self._idx is not None:
3546 self._idx.close()
3548 def __enter__(self) -> "Pack":
3549 """Enter context manager."""
3550 return self
3552 def __exit__(
3553 self,
3554 exc_type: type | None,
3555 exc_val: BaseException | None,
3556 exc_tb: TracebackType | None,
3557 ) -> None:
3558 """Exit context manager."""
3559 self.close()
3561 def __eq__(self, other: object) -> bool:
3562 """Check equality with another pack."""
3563 if not isinstance(other, Pack):
3564 return False
3565 return self.index == other.index
3567 def __len__(self) -> int:
3568 """Number of entries in this pack."""
3569 return len(self.index)
3571 def __repr__(self) -> str:
3572 """Return string representation of this pack."""
3573 return f"{self.__class__.__name__}({self._basename!r})"
3575 def __iter__(self) -> Iterator[bytes]:
3576 """Iterate over all the sha1s of the objects in this pack."""
3577 return iter(self.index)
3579 def check_length_and_checksum(self) -> None:
3580 """Sanity check the length and checksum of the pack index and data."""
3581 assert len(self.index) == len(self.data), (
3582 f"Length mismatch: {len(self.index)} (index) != {len(self.data)} (data)"
3583 )
3584 idx_stored_checksum = self.index.get_pack_checksum()
3585 data_stored_checksum = self.data.get_stored_checksum()
3586 if (
3587 idx_stored_checksum is not None
3588 and idx_stored_checksum != data_stored_checksum
3589 ):
3590 raise ChecksumMismatch(
3591 sha_to_hex(idx_stored_checksum),
3592 sha_to_hex(data_stored_checksum),
3593 )
3595 def check(self) -> None:
3596 """Check the integrity of this pack.
3598 Raises:
3599 ChecksumMismatch: if a checksum for the index or data is wrong
3600 """
3601 self.index.check()
3602 self.data.check()
3603 for obj in self.iterobjects():
3604 obj.check()
3605 # TODO: object connectivity checks
3607 def get_stored_checksum(self) -> bytes:
3608 """Return the stored checksum of the pack data."""
3609 return self.data.get_stored_checksum()
3611 def pack_tuples(self) -> list[tuple[ShaFile, None]]:
3612 """Return pack tuples for all objects in pack."""
3613 return [(o, None) for o in self.iterobjects()]
3615 def __contains__(self, sha1: bytes) -> bool:
3616 """Check whether this pack contains a particular SHA1."""
3617 try:
3618 self.index.object_offset(sha1)
3619 return True
3620 except KeyError:
3621 return False
3623 def get_raw(self, sha1: bytes) -> tuple[int, bytes]:
3624 """Get raw object data by SHA1."""
3625 offset = self.index.object_offset(sha1)
3626 obj_type, obj = self.data.get_object_at(offset)
3627 type_num, chunks = self.resolve_object(offset, obj_type, obj)
3628 return type_num, b"".join(chunks) # type: ignore[arg-type]
3630 def __getitem__(self, sha1: bytes) -> ShaFile:
3631 """Retrieve the specified SHA1."""
3632 type, uncomp = self.get_raw(sha1)
3633 return ShaFile.from_raw_string(type, uncomp, sha=sha1)
3635 def iterobjects(self) -> Iterator[ShaFile]:
3636 """Iterate over the objects in this pack."""
3637 return iter(
3638 PackInflater.for_pack_data(self.data, resolve_ext_ref=self.resolve_ext_ref)
3639 )
3641 def iterobjects_subset(
3642 self, shas: Iterable[ObjectID], *, allow_missing: bool = False
3643 ) -> Iterator[ShaFile]:
3644 """Iterate over a subset of objects in this pack."""
3645 return (
3646 uo
3647 for uo in PackInflater.for_pack_subset(
3648 self,
3649 shas,
3650 allow_missing=allow_missing,
3651 resolve_ext_ref=self.resolve_ext_ref,
3652 )
3653 if uo.id in shas
3654 )
3656 def iter_unpacked_subset(
3657 self,
3658 shas: Iterable[ObjectID],
3659 *,
3660 include_comp: bool = False,
3661 allow_missing: bool = False,
3662 convert_ofs_delta: bool = False,
3663 ) -> Iterator[UnpackedObject]:
3664 """Iterate over unpacked objects in subset."""
3665 ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list)
3666 ofs: dict[int, bytes] = {}
3667 todo = set(shas)
3668 for unpacked in self.iter_unpacked(include_comp=include_comp):
3669 sha = unpacked.sha()
3670 if unpacked.offset is not None:
3671 ofs[unpacked.offset] = sha
3672 hexsha = sha_to_hex(sha)
3673 if hexsha in todo:
3674 if unpacked.pack_type_num == OFS_DELTA:
3675 assert isinstance(unpacked.delta_base, int)
3676 assert unpacked.offset is not None
3677 base_offset = unpacked.offset - unpacked.delta_base
3678 try:
3679 unpacked.delta_base = ofs[base_offset]
3680 except KeyError:
3681 ofs_pending[base_offset].append(unpacked)
3682 continue
3683 else:
3684 unpacked.pack_type_num = REF_DELTA
3685 yield unpacked
3686 todo.remove(hexsha)
3687 if unpacked.offset is not None:
3688 for child in ofs_pending.pop(unpacked.offset, []):
3689 child.pack_type_num = REF_DELTA
3690 child.delta_base = sha
3691 yield child
3692 assert not ofs_pending
3693 if not allow_missing and todo:
3694 raise UnresolvedDeltas(list(todo))
3696 def iter_unpacked(self, include_comp: bool = False) -> Iterator[UnpackedObject]:
3697 """Iterate over all unpacked objects in this pack."""
3698 ofs_to_entries = {
3699 ofs: (sha, crc32) for (sha, ofs, crc32) in self.index.iterentries()
3700 }
3701 for unpacked in self.data.iter_unpacked(include_comp=include_comp):
3702 assert unpacked.offset is not None
3703 (sha, crc32) = ofs_to_entries[unpacked.offset]
3704 unpacked._sha = sha
3705 unpacked.crc32 = crc32
3706 yield unpacked
3708 def keep(self, msg: bytes | None = None) -> str:
3709 """Add a .keep file for the pack, preventing git from garbage collecting it.
3711 Args:
3712 msg: A message written inside the .keep file; can be used later
3713 to determine whether or not a .keep file is obsolete.
3714 Returns: The path of the .keep file, as a string.
3715 """
3716 keepfile_name = f"{self._basename}.keep"
3717 with GitFile(keepfile_name, "wb") as keepfile:
3718 if msg:
3719 keepfile.write(msg)
3720 keepfile.write(b"\n")
3721 return keepfile_name
3723 def get_ref(self, sha: bytes) -> tuple[int | None, int, OldUnpackedObject]:
3724 """Get the object for a ref SHA, only looking in this pack."""
3725 # TODO: cache these results
3726 try:
3727 offset = self.index.object_offset(sha)
3728 except KeyError:
3729 offset = None
3730 if offset:
3731 type, obj = self.data.get_object_at(offset)
3732 elif self.resolve_ext_ref:
3733 type, obj = self.resolve_ext_ref(sha)
3734 else:
3735 raise KeyError(sha)
3736 return offset, type, obj
3738 def resolve_object(
3739 self,
3740 offset: int,
3741 type: int,
3742 obj: OldUnpackedObject,
3743 get_ref: Callable[[bytes], tuple[int | None, int, OldUnpackedObject]]
3744 | None = None,
3745 ) -> tuple[int, OldUnpackedObject]:
3746 """Resolve an object, possibly resolving deltas when necessary.
3748 Returns: Tuple with object type and contents.
3749 """
3750 # Walk down the delta chain, building a stack of deltas to reach
3751 # the requested object.
3752 base_offset = offset
3753 base_type = type
3754 base_obj = obj
3755 delta_stack = []
3756 while base_type in DELTA_TYPES:
3757 prev_offset = base_offset
3758 if get_ref is None:
3759 get_ref = self.get_ref
3760 if base_type == OFS_DELTA:
3761 (delta_offset, delta) = base_obj
3762 # TODO: clean up asserts and replace with nicer error messages
3763 assert isinstance(delta_offset, int), (
3764 f"Expected int, got {delta_offset.__class__}"
3765 )
3766 base_offset = base_offset - delta_offset
3767 base_type, base_obj = self.data.get_object_at(base_offset)
3768 assert isinstance(base_type, int)
3769 elif base_type == REF_DELTA:
3770 (basename, delta) = base_obj
3771 assert isinstance(basename, bytes) and len(basename) == 20
3772 base_offset, base_type, base_obj = get_ref(basename) # type: ignore[assignment]
3773 assert isinstance(base_type, int)
3774 if base_offset == prev_offset: # object is based on itself
3775 raise UnresolvedDeltas([basename])
3776 delta_stack.append((prev_offset, base_type, delta))
3778 # Now grab the base object (mustn't be a delta) and apply the
3779 # deltas all the way up the stack.
3780 chunks = base_obj
3781 for prev_offset, _delta_type, delta in reversed(delta_stack):
3782 # Convert chunks to bytes for apply_delta if needed
3783 if isinstance(chunks, list):
3784 chunks_bytes = b"".join(chunks)
3785 elif isinstance(chunks, tuple):
3786 # For tuple type, second element is the actual data
3787 _, chunk_data = chunks
3788 if isinstance(chunk_data, list):
3789 chunks_bytes = b"".join(chunk_data)
3790 else:
3791 chunks_bytes = chunk_data
3792 else:
3793 chunks_bytes = chunks
3795 # Apply delta and get result as list
3796 chunks = apply_delta(chunks_bytes, delta)
3798 if prev_offset is not None:
3799 self.data._offset_cache[prev_offset] = base_type, chunks
3800 return base_type, chunks
3802 def entries(
3803 self, progress: Callable[[int, int], None] | None = None
3804 ) -> Iterator[PackIndexEntry]:
3805 """Yield entries summarizing the contents of this pack.
3807 Args:
3808 progress: Progress function, called with current and total
3809 object count.
3810 Returns: iterator of tuples with (sha, offset, crc32)
3811 """
3812 return self.data.iterentries(
3813 progress=progress, resolve_ext_ref=self.resolve_ext_ref
3814 )
3816 def sorted_entries(
3817 self, progress: ProgressFn | None = None
3818 ) -> Iterator[PackIndexEntry]:
3819 """Return entries in this pack, sorted by SHA.
3821 Args:
3822 progress: Progress function, called with current and total
3823 object count
3824 Returns: Iterator of tuples with (sha, offset, crc32)
3825 """
3826 return iter(
3827 self.data.sorted_entries(
3828 progress=progress, resolve_ext_ref=self.resolve_ext_ref
3829 )
3830 )
3832 def get_unpacked_object(
3833 self, sha: bytes, *, include_comp: bool = False, convert_ofs_delta: bool = True
3834 ) -> UnpackedObject:
3835 """Get the unpacked object for a sha.
3837 Args:
3838 sha: SHA of object to fetch
3839 include_comp: Whether to include compression data in UnpackedObject
3840 convert_ofs_delta: Whether to convert offset deltas to ref deltas
3841 """
3842 offset = self.index.object_offset(sha)
3843 unpacked = self.data.get_unpacked_object_at(offset, include_comp=include_comp)
3844 if unpacked.pack_type_num == OFS_DELTA and convert_ofs_delta:
3845 assert isinstance(unpacked.delta_base, int)
3846 unpacked.delta_base = self.index.object_sha1(offset - unpacked.delta_base)
3847 unpacked.pack_type_num = REF_DELTA
3848 return unpacked
3851def extend_pack(
3852 f: BinaryIO,
3853 object_ids: Set[ObjectID],
3854 get_raw: Callable[[ObjectID], tuple[int, bytes]],
3855 *,
3856 compression_level: int = -1,
3857 progress: Callable[[bytes], None] | None = None,
3858) -> tuple[bytes, list[tuple[bytes, int, int]]]:
3859 """Extend a pack file with more objects.
3861 The caller should make sure that object_ids does not contain any objects
3862 that are already in the pack
3863 """
3864 # Update the header with the new number of objects.
3865 f.seek(0)
3866 _version, num_objects = read_pack_header(f.read)
3868 if object_ids:
3869 f.seek(0)
3870 write_pack_header(f.write, num_objects + len(object_ids))
3872 # Must flush before reading (http://bugs.python.org/issue3207)
3873 f.flush()
3875 # Rescan the rest of the pack, computing the SHA with the new header.
3876 new_sha = compute_file_sha(f, end_ofs=-20)
3878 # Must reposition before writing (http://bugs.python.org/issue3207)
3879 f.seek(0, os.SEEK_CUR)
3881 extra_entries = []
3883 # Complete the pack.
3884 for i, object_id in enumerate(object_ids):
3885 if progress is not None:
3886 progress(
3887 (f"writing extra base objects: {i}/{len(object_ids)}\r").encode("ascii")
3888 )
3889 assert len(object_id) == 20
3890 type_num, data = get_raw(object_id)
3891 offset = f.tell()
3892 crc32 = write_pack_object(
3893 f.write,
3894 type_num,
3895 [data], # Convert bytes to list[bytes]
3896 sha=new_sha,
3897 compression_level=compression_level,
3898 )
3899 extra_entries.append((object_id, offset, crc32))
3900 pack_sha = new_sha.digest()
3901 f.write(pack_sha)
3902 return pack_sha, extra_entries
3905try:
3906 from dulwich._pack import ( # type: ignore
3907 apply_delta,
3908 bisect_find_sha,
3909 )
3910except ImportError:
3911 pass
3913# Try to import the Rust version of create_delta
3914try:
3915 from dulwich._pack import create_delta as _create_delta_rs
3916except ImportError:
3917 pass
3918else:
3919 # Wrap the Rust version to match the Python API (returns bytes instead of Iterator)
3920 def _create_delta_rs_wrapper(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]:
3921 """Wrapper for Rust create_delta to match Python API."""
3922 yield _create_delta_rs(base_buf, target_buf)
3924 create_delta = _create_delta_rs_wrapper