1from __future__ import annotations
2
3import collections.abc as cabc
4import typing as t
5from datetime import datetime
6
7from .._internal import _plain_int
8from ..http import http_date
9from ..http import is_byte_range_valid
10from ..http import parse_date
11from ..http import quote_etag
12from ..http import unquote_etag
13
14if t.TYPE_CHECKING:
15 import typing_extensions as te
16
17T = t.TypeVar("T")
18
19
20class IfRange:
21 """A parsed ``If-Range`` header. Either a strong ETag or a date, but not
22 both. Weak ETag values must not be used.
23
24 .. versionadded:: 0.7
25 """
26
27 def __init__(self, etag: str | None = None, date: datetime | None = None):
28 self.etag = etag
29 """A strong ETag value, unquoted, without weakness information. Weak
30 ETag values must not be used.
31 """
32
33 self.date = date
34 """A parsed datetime object."""
35
36 @classmethod
37 def from_header(cls, value: str | None) -> te.Self:
38 """Parse an ``If-Range`` header value and create an instance of this
39 class. A weak ETag value is discarded.
40
41 .. versionadded:: 3.2
42 """
43 if not value:
44 return cls()
45
46 if (date := parse_date(value)) is not None:
47 return cls(date=date)
48
49 value, weak = unquote_etag(value)
50
51 if weak:
52 return cls()
53
54 return cls(etag=value)
55
56 def to_header(self) -> str:
57 """Convert to an ``If-Range`` header value."""
58 if self.date is not None:
59 return http_date(self.date)
60 if self.etag is not None:
61 return quote_etag(self.etag)
62 return ""
63
64 def __str__(self) -> str:
65 return self.to_header()
66
67 def __repr__(self) -> str:
68 return f"<{type(self).__name__} {str(self)!r}>"
69
70
71class Range:
72 """Represents a ``Range`` header. All methods only support only
73 bytes as the unit. Stores a list of ranges if given, but the methods
74 only work if only one range is provided.
75
76 :raise ValueError: If the ranges provided are invalid.
77
78 .. versionchanged:: 0.15
79 The ranges passed in are validated.
80
81 .. versionadded:: 0.7
82 """
83
84 def __init__(
85 self, units: str, ranges: cabc.Sequence[tuple[int, int | None]]
86 ) -> None:
87 #: The units of this range. Usually "bytes".
88 self.units = units
89 #: A list of ``(begin, end)`` tuples for the range header provided.
90 #: The ranges are non-inclusive.
91 self.ranges = ranges
92
93 for start, end in ranges:
94 if start is None or (end is not None and (start < 0 or start >= end)):
95 raise ValueError(f"{(start, end)} is not a valid range.")
96
97 def range_for_length(self, length: int | None) -> tuple[int, int] | None:
98 """If the range is for bytes, the length is not None and there is
99 exactly one range and it is satisfiable it returns a ``(start, stop)``
100 tuple, otherwise `None`.
101 """
102 if self.units != "bytes" or length is None or len(self.ranges) != 1:
103 return None
104 start, end = self.ranges[0]
105 if end is None:
106 end = length
107 if start < 0:
108 start += length
109 if is_byte_range_valid(start, end, length):
110 return start, min(end, length)
111 return None
112
113 def make_content_range(self, length: int | None) -> ContentRange | None:
114 """Creates a :class:`~werkzeug.datastructures.ContentRange` object
115 from the current range and given content length.
116 """
117 rng = self.range_for_length(length)
118 if rng is not None:
119 return ContentRange(self.units, rng[0], rng[1], length)
120 return None
121
122 @classmethod
123 def from_header(cls, value: str | None) -> te.Self | None:
124 """Parse a ``Range`` header value and create an instance of this class,
125 or ``None`` if the value is empty.
126
127 .. versionadded:: 3.2
128 """
129 if not value or "=" not in value:
130 return None
131
132 ranges = []
133 last_end = 0
134 units, _, ranges_str = value.partition("=")
135 units = units.strip().lower()
136
137 for item in ranges_str.split(","):
138 item = item.strip()
139
140 if "-" not in item:
141 return None
142
143 if item.startswith("-"):
144 if last_end < 0:
145 return None
146
147 try:
148 begin = _plain_int(item)
149 except ValueError:
150 return None
151
152 end = None
153 last_end = -1
154
155 else:
156 begin_str, _, end_str = item.partition("-")
157 begin_str = begin_str.strip()
158 end_str = end_str.strip()
159
160 try:
161 begin = _plain_int(begin_str)
162 except ValueError:
163 return None
164
165 if begin < last_end or last_end < 0:
166 return None
167
168 if end_str:
169 try:
170 end = _plain_int(end_str) + 1
171 except ValueError:
172 return None
173
174 if begin >= end:
175 return None
176 else:
177 end = None
178
179 last_end = end if end is not None else -1
180
181 ranges.append((begin, end))
182
183 return cls(units, ranges)
184
185 def to_header(self) -> str:
186 """Convert to a ``Range`` header value."""
187 ranges = []
188 for begin, end in self.ranges:
189 if end is None:
190 ranges.append(f"{begin}-" if begin >= 0 else str(begin))
191 else:
192 ranges.append(f"{begin}-{end - 1}")
193 return f"{self.units}={','.join(ranges)}"
194
195 def to_content_range_header(self, length: int | None) -> str | None:
196 """Converts the object into `Content-Range` HTTP header,
197 based on given length
198 """
199 range = self.range_for_length(length)
200 if range is not None:
201 return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
202 return None
203
204 def __str__(self) -> str:
205 return self.to_header()
206
207 def __repr__(self) -> str:
208 return f"<{type(self).__name__} {str(self)!r}>"
209
210
211class _CallbackProperty(t.Generic[T]):
212 def __set_name__(self, owner: type[ContentRange], name: str) -> None:
213 self.attr = f"_{name}"
214
215 @t.overload
216 def __get__(self, instance: None, owner: None) -> te.Self: ...
217 @t.overload
218 def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ...
219 def __get__(
220 self, instance: ContentRange | None, owner: type[ContentRange] | None
221 ) -> te.Self | T:
222 if instance is None:
223 return self
224
225 return instance.__dict__[self.attr] # type: ignore[no-any-return]
226
227 def __set__(self, instance: ContentRange, value: T) -> None:
228 instance.__dict__[self.attr] = value
229
230 if instance._on_update is not None:
231 instance._on_update(instance)
232
233
234class ContentRange:
235 """Represents the content range header.
236
237 .. versionchanged:: 3.2
238 The ``on_update`` parameter was removed.
239
240 .. versionadded:: 0.7
241 """
242
243 def __init__(
244 self,
245 units: str | None,
246 start: int | None,
247 stop: int | None,
248 length: int | None = None,
249 ) -> None:
250 self._on_update: cabc.Callable[[ContentRange], None] | None = None
251 self.set(start, stop, length, units)
252
253 #: The units to use, usually "bytes"
254 units: str | None = _CallbackProperty() # type: ignore[assignment]
255 #: The start point of the range or `None`.
256 start: int | None = _CallbackProperty() # type: ignore[assignment]
257 #: The stop point of the range (non-inclusive) or `None`. Can only be
258 #: `None` if also start is `None`.
259 stop: int | None = _CallbackProperty() # type: ignore[assignment]
260 #: The length of the range or `None`.
261 length: int | None = _CallbackProperty() # type: ignore[assignment]
262
263 def set(
264 self,
265 start: int | None,
266 stop: int | None,
267 length: int | None = None,
268 units: str | None = "bytes",
269 ) -> None:
270 """Simple method to update the ranges."""
271 assert is_byte_range_valid(start, stop, length), "Bad range provided"
272 self._units: str | None = units
273 self._start: int | None = start
274 self._stop: int | None = stop
275 self._length: int | None = length
276 if self._on_update is not None:
277 self._on_update(self)
278
279 def unset(self) -> None:
280 """Sets the units to `None` which indicates that the header should
281 no longer be used.
282 """
283 self.set(None, None, units=None)
284
285 @classmethod
286 def from_header(cls, value: str | None) -> te.Self | None:
287 """Parse a ``Content-Range`` header value and create an instance of this class,
288 or ``None`` if the value is empty.
289
290 .. versionadded:: 3.2
291 """
292 if not value:
293 return None
294
295 units, _, range_str = value.strip().partition(" ")
296 rng, sep, length_str = range_str.partition("/")
297
298 if not sep:
299 return None
300
301 if length_str == "*":
302 length = None
303 else:
304 try:
305 length = _plain_int(length_str)
306 except ValueError:
307 return None
308
309 if rng == "*":
310 if not is_byte_range_valid(None, None, length):
311 return None
312
313 return cls(units, None, None, length)
314
315 start_str, sep, stop_str = rng.partition("-")
316
317 if not sep:
318 return None
319
320 try:
321 start = _plain_int(start_str)
322 stop = _plain_int(stop_str) + 1
323 except ValueError:
324 return None
325
326 if is_byte_range_valid(start, stop, length):
327 return cls(units, start, stop, length)
328
329 return None
330
331 def to_header(self) -> str:
332 """Convert to a ``Content-Range`` header value."""
333 if self._units is None:
334 return ""
335 if self._length is None:
336 length: str | int = "*"
337 else:
338 length = self._length
339 if self._start is None:
340 return f"{self._units} */{length}"
341 return f"{self._units} {self._start}-{self._stop - 1}/{length}" # type: ignore[operator]
342
343 def __bool__(self) -> bool:
344 return self._units is not None
345
346 def __str__(self) -> str:
347 return self.to_header()
348
349 def __repr__(self) -> str:
350 return f"<{type(self).__name__} {str(self)!r}>"