1from __future__ import annotations
2
3import collections.abc as cabc
4import typing as t
5from datetime import datetime
6
7if t.TYPE_CHECKING:
8 import typing_extensions as te
9
10T = t.TypeVar("T")
11
12
13class IfRange:
14 """Very simple object that represents the `If-Range` header in parsed
15 form. It will either have neither a etag or date or one of either but
16 never both.
17
18 .. versionadded:: 0.7
19 """
20
21 def __init__(self, etag: str | None = None, date: datetime | None = None):
22 #: The etag parsed and unquoted. Ranges always operate on strong
23 #: etags so the weakness information is not necessary.
24 self.etag = etag
25 #: The date in parsed format or `None`.
26 self.date = date
27
28 def to_header(self) -> str:
29 """Converts the object back into an HTTP header."""
30 if self.date is not None:
31 return http.http_date(self.date)
32 if self.etag is not None:
33 return http.quote_etag(self.etag)
34 return ""
35
36 def __str__(self) -> str:
37 return self.to_header()
38
39 def __repr__(self) -> str:
40 return f"<{type(self).__name__} {str(self)!r}>"
41
42
43class Range:
44 """Represents a ``Range`` header. All methods only support only
45 bytes as the unit. Stores a list of ranges if given, but the methods
46 only work if only one range is provided.
47
48 :raise ValueError: If the ranges provided are invalid.
49
50 .. versionchanged:: 0.15
51 The ranges passed in are validated.
52
53 .. versionadded:: 0.7
54 """
55
56 def __init__(
57 self, units: str, ranges: cabc.Sequence[tuple[int, int | None]]
58 ) -> None:
59 #: The units of this range. Usually "bytes".
60 self.units = units
61 #: A list of ``(begin, end)`` tuples for the range header provided.
62 #: The ranges are non-inclusive.
63 self.ranges = ranges
64
65 for start, end in ranges:
66 if start is None or (end is not None and (start < 0 or start >= end)):
67 raise ValueError(f"{(start, end)} is not a valid range.")
68
69 def range_for_length(self, length: int | None) -> tuple[int, int] | None:
70 """If the range is for bytes, the length is not None and there is
71 exactly one range and it is satisfiable it returns a ``(start, stop)``
72 tuple, otherwise `None`.
73 """
74 if self.units != "bytes" or length is None or len(self.ranges) != 1:
75 return None
76 start, end = self.ranges[0]
77 if end is None:
78 end = length
79 if start < 0:
80 start += length
81 if http.is_byte_range_valid(start, end, length):
82 return start, min(end, length)
83 return None
84
85 def make_content_range(self, length: int | None) -> ContentRange | None:
86 """Creates a :class:`~werkzeug.datastructures.ContentRange` object
87 from the current range and given content length.
88 """
89 rng = self.range_for_length(length)
90 if rng is not None:
91 return ContentRange(self.units, rng[0], rng[1], length)
92 return None
93
94 def to_header(self) -> str:
95 """Converts the object back into an HTTP header."""
96 ranges = []
97 for begin, end in self.ranges:
98 if end is None:
99 ranges.append(f"{begin}-" if begin >= 0 else str(begin))
100 else:
101 ranges.append(f"{begin}-{end - 1}")
102 return f"{self.units}={','.join(ranges)}"
103
104 def to_content_range_header(self, length: int | None) -> str | None:
105 """Converts the object into `Content-Range` HTTP header,
106 based on given length
107 """
108 range = self.range_for_length(length)
109 if range is not None:
110 return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
111 return None
112
113 def __str__(self) -> str:
114 return self.to_header()
115
116 def __repr__(self) -> str:
117 return f"<{type(self).__name__} {str(self)!r}>"
118
119
120class _CallbackProperty(t.Generic[T]):
121 def __set_name__(self, owner: type[ContentRange], name: str) -> None:
122 self.attr = f"_{name}"
123
124 @t.overload
125 def __get__(self, instance: None, owner: None) -> te.Self: ...
126 @t.overload
127 def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ...
128 def __get__(
129 self, instance: ContentRange | None, owner: type[ContentRange] | None
130 ) -> te.Self | T:
131 if instance is None:
132 return self
133
134 return instance.__dict__[self.attr] # type: ignore[no-any-return]
135
136 def __set__(self, instance: ContentRange, value: T) -> None:
137 instance.__dict__[self.attr] = value
138
139 if instance.on_update is not None:
140 instance.on_update(instance)
141
142
143class ContentRange:
144 """Represents the content range header.
145
146 .. versionadded:: 0.7
147 """
148
149 def __init__(
150 self,
151 units: str | None,
152 start: int | None,
153 stop: int | None,
154 length: int | None = None,
155 on_update: cabc.Callable[[ContentRange], None] | None = None,
156 ) -> None:
157 self.on_update = on_update
158 self.set(start, stop, length, units)
159
160 #: The units to use, usually "bytes"
161 units: str | None = _CallbackProperty() # type: ignore[assignment]
162 #: The start point of the range or `None`.
163 start: int | None = _CallbackProperty() # type: ignore[assignment]
164 #: The stop point of the range (non-inclusive) or `None`. Can only be
165 #: `None` if also start is `None`.
166 stop: int | None = _CallbackProperty() # type: ignore[assignment]
167 #: The length of the range or `None`.
168 length: int | None = _CallbackProperty() # type: ignore[assignment]
169
170 def set(
171 self,
172 start: int | None,
173 stop: int | None,
174 length: int | None = None,
175 units: str | None = "bytes",
176 ) -> None:
177 """Simple method to update the ranges."""
178 assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
179 self._units: str | None = units
180 self._start: int | None = start
181 self._stop: int | None = stop
182 self._length: int | None = length
183 if self.on_update is not None:
184 self.on_update(self)
185
186 def unset(self) -> None:
187 """Sets the units to `None` which indicates that the header should
188 no longer be used.
189 """
190 self.set(None, None, units=None)
191
192 def to_header(self) -> str:
193 if self._units is None:
194 return ""
195 if self._length is None:
196 length: str | int = "*"
197 else:
198 length = self._length
199 if self._start is None:
200 return f"{self._units} */{length}"
201 return f"{self._units} {self._start}-{self._stop - 1}/{length}" # type: ignore[operator]
202
203 def __bool__(self) -> bool:
204 return self._units is not None
205
206 def __str__(self) -> str:
207 return self.to_header()
208
209 def __repr__(self) -> str:
210 return f"<{type(self).__name__} {str(self)!r}>"
211
212
213# circular dependencies
214from .. import http