1from __future__ import annotations 
    2 
    3import collections.abc as cabc 
    4import typing as t 
    5from datetime import datetime 
    6 
    7if t.TYPE_CHECKING: 
    8    import typing_extensions as te 
    9 
    10T = t.TypeVar("T") 
    11 
    12 
    13class IfRange: 
    14    """Very simple object that represents the `If-Range` header in parsed 
    15    form.  It will either have neither a etag or date or one of either but 
    16    never both. 
    17 
    18    .. versionadded:: 0.7 
    19    """ 
    20 
    21    def __init__(self, etag: str | None = None, date: datetime | None = None): 
    22        #: The etag parsed and unquoted.  Ranges always operate on strong 
    23        #: etags so the weakness information is not necessary. 
    24        self.etag = etag 
    25        #: The date in parsed format or `None`. 
    26        self.date = date 
    27 
    28    def to_header(self) -> str: 
    29        """Converts the object back into an HTTP header.""" 
    30        if self.date is not None: 
    31            return http.http_date(self.date) 
    32        if self.etag is not None: 
    33            return http.quote_etag(self.etag) 
    34        return "" 
    35 
    36    def __str__(self) -> str: 
    37        return self.to_header() 
    38 
    39    def __repr__(self) -> str: 
    40        return f"<{type(self).__name__} {str(self)!r}>" 
    41 
    42 
    43class Range: 
    44    """Represents a ``Range`` header. All methods only support only 
    45    bytes as the unit. Stores a list of ranges if given, but the methods 
    46    only work if only one range is provided. 
    47 
    48    :raise ValueError: If the ranges provided are invalid. 
    49 
    50    .. versionchanged:: 0.15 
    51        The ranges passed in are validated. 
    52 
    53    .. versionadded:: 0.7 
    54    """ 
    55 
    56    def __init__( 
    57        self, units: str, ranges: cabc.Sequence[tuple[int, int | None]] 
    58    ) -> None: 
    59        #: The units of this range.  Usually "bytes". 
    60        self.units = units 
    61        #: A list of ``(begin, end)`` tuples for the range header provided. 
    62        #: The ranges are non-inclusive. 
    63        self.ranges = ranges 
    64 
    65        for start, end in ranges: 
    66            if start is None or (end is not None and (start < 0 or start >= end)): 
    67                raise ValueError(f"{(start, end)} is not a valid range.") 
    68 
    69    def range_for_length(self, length: int | None) -> tuple[int, int] | None: 
    70        """If the range is for bytes, the length is not None and there is 
    71        exactly one range and it is satisfiable it returns a ``(start, stop)`` 
    72        tuple, otherwise `None`. 
    73        """ 
    74        if self.units != "bytes" or length is None or len(self.ranges) != 1: 
    75            return None 
    76        start, end = self.ranges[0] 
    77        if end is None: 
    78            end = length 
    79            if start < 0: 
    80                start += length 
    81        if http.is_byte_range_valid(start, end, length): 
    82            return start, min(end, length) 
    83        return None 
    84 
    85    def make_content_range(self, length: int | None) -> ContentRange | None: 
    86        """Creates a :class:`~werkzeug.datastructures.ContentRange` object 
    87        from the current range and given content length. 
    88        """ 
    89        rng = self.range_for_length(length) 
    90        if rng is not None: 
    91            return ContentRange(self.units, rng[0], rng[1], length) 
    92        return None 
    93 
    94    def to_header(self) -> str: 
    95        """Converts the object back into an HTTP header.""" 
    96        ranges = [] 
    97        for begin, end in self.ranges: 
    98            if end is None: 
    99                ranges.append(f"{begin}-" if begin >= 0 else str(begin)) 
    100            else: 
    101                ranges.append(f"{begin}-{end - 1}") 
    102        return f"{self.units}={','.join(ranges)}" 
    103 
    104    def to_content_range_header(self, length: int | None) -> str | None: 
    105        """Converts the object into `Content-Range` HTTP header, 
    106        based on given length 
    107        """ 
    108        range = self.range_for_length(length) 
    109        if range is not None: 
    110            return f"{self.units} {range[0]}-{range[1] - 1}/{length}" 
    111        return None 
    112 
    113    def __str__(self) -> str: 
    114        return self.to_header() 
    115 
    116    def __repr__(self) -> str: 
    117        return f"<{type(self).__name__} {str(self)!r}>" 
    118 
    119 
    120class _CallbackProperty(t.Generic[T]): 
    121    def __set_name__(self, owner: type[ContentRange], name: str) -> None: 
    122        self.attr = f"_{name}" 
    123 
    124    @t.overload 
    125    def __get__(self, instance: None, owner: None) -> te.Self: ... 
    126    @t.overload 
    127    def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ... 
    128    def __get__( 
    129        self, instance: ContentRange | None, owner: type[ContentRange] | None 
    130    ) -> te.Self | T: 
    131        if instance is None: 
    132            return self 
    133 
    134        return instance.__dict__[self.attr]  # type: ignore[no-any-return] 
    135 
    136    def __set__(self, instance: ContentRange, value: T) -> None: 
    137        instance.__dict__[self.attr] = value 
    138 
    139        if instance.on_update is not None: 
    140            instance.on_update(instance) 
    141 
    142 
    143class ContentRange: 
    144    """Represents the content range header. 
    145 
    146    .. versionadded:: 0.7 
    147    """ 
    148 
    149    def __init__( 
    150        self, 
    151        units: str | None, 
    152        start: int | None, 
    153        stop: int | None, 
    154        length: int | None = None, 
    155        on_update: cabc.Callable[[ContentRange], None] | None = None, 
    156    ) -> None: 
    157        self.on_update = on_update 
    158        self.set(start, stop, length, units) 
    159 
    160    #: The units to use, usually "bytes" 
    161    units: str | None = _CallbackProperty()  # type: ignore[assignment] 
    162    #: The start point of the range or `None`. 
    163    start: int | None = _CallbackProperty()  # type: ignore[assignment] 
    164    #: The stop point of the range (non-inclusive) or `None`.  Can only be 
    165    #: `None` if also start is `None`. 
    166    stop: int | None = _CallbackProperty()  # type: ignore[assignment] 
    167    #: The length of the range or `None`. 
    168    length: int | None = _CallbackProperty()  # type: ignore[assignment] 
    169 
    170    def set( 
    171        self, 
    172        start: int | None, 
    173        stop: int | None, 
    174        length: int | None = None, 
    175        units: str | None = "bytes", 
    176    ) -> None: 
    177        """Simple method to update the ranges.""" 
    178        assert http.is_byte_range_valid(start, stop, length), "Bad range provided" 
    179        self._units: str | None = units 
    180        self._start: int | None = start 
    181        self._stop: int | None = stop 
    182        self._length: int | None = length 
    183        if self.on_update is not None: 
    184            self.on_update(self) 
    185 
    186    def unset(self) -> None: 
    187        """Sets the units to `None` which indicates that the header should 
    188        no longer be used. 
    189        """ 
    190        self.set(None, None, units=None) 
    191 
    192    def to_header(self) -> str: 
    193        if self._units is None: 
    194            return "" 
    195        if self._length is None: 
    196            length: str | int = "*" 
    197        else: 
    198            length = self._length 
    199        if self._start is None: 
    200            return f"{self._units} */{length}" 
    201        return f"{self._units} {self._start}-{self._stop - 1}/{length}"  # type: ignore[operator] 
    202 
    203    def __bool__(self) -> bool: 
    204        return self._units is not None 
    205 
    206    def __str__(self) -> str: 
    207        return self.to_header() 
    208 
    209    def __repr__(self) -> str: 
    210        return f"<{type(self).__name__} {str(self)!r}>" 
    211 
    212 
    213# circular dependencies 
    214from .. import http