1# SPDX-FileCopyrightText: 2015 Eric Larson
2#
3# SPDX-License-Identifier: Apache-2.0
4from __future__ import annotations
5
6import mmap
7from tempfile import NamedTemporaryFile
8from typing import TYPE_CHECKING, Any, Callable
9
10if TYPE_CHECKING:
11 from collections.abc import Buffer
12 from http.client import HTTPResponse
13
14
15class CallbackFileWrapper:
16 """
17 Small wrapper around a fp object which will tee everything read into a
18 buffer, and when that file is closed it will execute a callback with the
19 contents of that buffer.
20
21 All attributes are proxied to the underlying file object.
22
23 This class uses members with a double underscore (__) leading prefix so as
24 not to accidentally shadow an attribute.
25
26 The data is stored in a temporary file until it is all available. As long
27 as the temporary files directory is disk-based (sometimes it's a
28 memory-backed-``tmpfs`` on Linux), data will be unloaded to disk if memory
29 pressure is high. For small files the disk usually won't be used at all,
30 it'll all be in the filesystem memory cache, so there should be no
31 performance impact.
32 """
33
34 def __init__(
35 self, fp: HTTPResponse, callback: Callable[[Buffer], None] | None
36 ) -> None:
37 self.__buf = NamedTemporaryFile("rb+", delete=True)
38 self.__fp = fp
39 self.__callback = callback
40
41 def __getattr__(self, name: str) -> Any:
42 # The vagaries of garbage collection means that self.__fp is
43 # not always set. By using __getattribute__ and the private
44 # name[0] allows looking up the attribute value and raising an
45 # AttributeError when it doesn't exist. This stop things from
46 # infinitely recursing calls to getattr in the case where
47 # self.__fp hasn't been set.
48 #
49 # [0] https://docs.python.org/2/reference/expressions.html#atom-identifiers
50 fp = self.__getattribute__("_CallbackFileWrapper__fp")
51 return getattr(fp, name)
52
53 def __is_fp_closed(self) -> bool:
54 try:
55 return self.__fp.fp is None
56
57 except AttributeError:
58 pass
59
60 try:
61 closed: bool = self.__fp.closed
62 return closed
63
64 except AttributeError:
65 pass
66
67 # We just don't cache it then.
68 # TODO: Add some logging here...
69 return False
70
71 def _close(self) -> None:
72 result: Buffer
73 if self.__callback:
74 if self.__buf.tell() == 0:
75 # Empty file:
76 result = b""
77 else:
78 # Return the data without actually loading it into memory,
79 # relying on Python's buffer API and mmap(). mmap() just gives
80 # a view directly into the filesystem's memory cache, so it
81 # doesn't result in duplicate memory use.
82 self.__buf.seek(0, 0)
83 result = memoryview(
84 mmap.mmap(self.__buf.fileno(), 0, access=mmap.ACCESS_READ)
85 )
86 self.__callback(result)
87
88 # We assign this to None here, because otherwise we can get into
89 # really tricky problems where the CPython interpreter dead locks
90 # because the callback is holding a reference to something which
91 # has a __del__ method. Setting this to None breaks the cycle
92 # and allows the garbage collector to do it's thing normally.
93 self.__callback = None
94
95 # Closing the temporary file releases memory and frees disk space.
96 # Important when caching big files.
97 self.__buf.close()
98
99 def read(self, amt: int | None = None) -> bytes:
100 data: bytes = self.__fp.read(amt)
101 if data:
102 # We may be dealing with b'', a sign that things are over:
103 # it's passed e.g. after we've already closed self.__buf.
104 self.__buf.write(data)
105 if self.__is_fp_closed():
106 self._close()
107
108 return data
109
110 def _safe_read(self, amt: int) -> bytes:
111 data: bytes = self.__fp._safe_read(amt) # type: ignore[attr-defined]
112 if amt == 2 and data == b"\r\n":
113 # urllib executes this read to toss the CRLF at the end
114 # of the chunk.
115 return data
116
117 self.__buf.write(data)
118 if self.__is_fp_closed():
119 self._close()
120
121 return data