1"""
2Patched ``BZ2File`` and ``LZMAFile`` to handle pickle protocol 5.
3"""
4
5from __future__ import annotations
6
7import bz2
8from pickle import PickleBuffer
9
10from pandas.compat._constants import PY310
11
12try:
13 import lzma
14
15 has_lzma = True
16except ImportError:
17 has_lzma = False
18
19
20def flatten_buffer(
21 b: bytes | bytearray | memoryview | PickleBuffer,
22) -> bytes | bytearray | memoryview:
23 """
24 Return some 1-D `uint8` typed buffer.
25
26 Coerces anything that does not match that description to one that does
27 without copying if possible (otherwise will copy).
28 """
29
30 if isinstance(b, (bytes, bytearray)):
31 return b
32
33 if not isinstance(b, PickleBuffer):
34 b = PickleBuffer(b)
35
36 try:
37 # coerce to 1-D `uint8` C-contiguous `memoryview` zero-copy
38 return b.raw()
39 except BufferError:
40 # perform in-memory copy if buffer is not contiguous
41 return memoryview(b).tobytes("A")
42
43
44class BZ2File(bz2.BZ2File):
45 if not PY310:
46
47 def write(self, b) -> int:
48 # Workaround issue where `bz2.BZ2File` expects `len`
49 # to return the number of bytes in `b` by converting
50 # `b` into something that meets that constraint with
51 # minimal copying.
52 #
53 # Note: This is fixed in Python 3.10.
54 return super().write(flatten_buffer(b))
55
56
57if has_lzma:
58
59 class LZMAFile(lzma.LZMAFile):
60 if not PY310:
61
62 def write(self, b) -> int:
63 # Workaround issue where `lzma.LZMAFile` expects `len`
64 # to return the number of bytes in `b` by converting
65 # `b` into something that meets that constraint with
66 # minimal copying.
67 #
68 # Note: This is fixed in Python 3.10.
69 return super().write(flatten_buffer(b))