Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/datastructures/range.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import typing as t 

5from datetime import datetime 

6 

7if t.TYPE_CHECKING: 

8 import typing_extensions as te 

9 

10T = t.TypeVar("T") 

11 

12 

13class IfRange: 

14 """Very simple object that represents the `If-Range` header in parsed 

15 form. It will either have neither a etag or date or one of either but 

16 never both. 

17 

18 .. versionadded:: 0.7 

19 """ 

20 

21 def __init__(self, etag: str | None = None, date: datetime | None = None): 

22 #: The etag parsed and unquoted. Ranges always operate on strong 

23 #: etags so the weakness information is not necessary. 

24 self.etag = etag 

25 #: The date in parsed format or `None`. 

26 self.date = date 

27 

28 def to_header(self) -> str: 

29 """Converts the object back into an HTTP header.""" 

30 if self.date is not None: 

31 return http.http_date(self.date) 

32 if self.etag is not None: 

33 return http.quote_etag(self.etag) 

34 return "" 

35 

36 def __str__(self) -> str: 

37 return self.to_header() 

38 

39 def __repr__(self) -> str: 

40 return f"<{type(self).__name__} {str(self)!r}>" 

41 

42 

43class Range: 

44 """Represents a ``Range`` header. All methods only support only 

45 bytes as the unit. Stores a list of ranges if given, but the methods 

46 only work if only one range is provided. 

47 

48 :raise ValueError: If the ranges provided are invalid. 

49 

50 .. versionchanged:: 0.15 

51 The ranges passed in are validated. 

52 

53 .. versionadded:: 0.7 

54 """ 

55 

56 def __init__( 

57 self, units: str, ranges: cabc.Sequence[tuple[int, int | None]] 

58 ) -> None: 

59 #: The units of this range. Usually "bytes". 

60 self.units = units 

61 #: A list of ``(begin, end)`` tuples for the range header provided. 

62 #: The ranges are non-inclusive. 

63 self.ranges = ranges 

64 

65 for start, end in ranges: 

66 if start is None or (end is not None and (start < 0 or start >= end)): 

67 raise ValueError(f"{(start, end)} is not a valid range.") 

68 

69 def range_for_length(self, length: int | None) -> tuple[int, int] | None: 

70 """If the range is for bytes, the length is not None and there is 

71 exactly one range and it is satisfiable it returns a ``(start, stop)`` 

72 tuple, otherwise `None`. 

73 """ 

74 if self.units != "bytes" or length is None or len(self.ranges) != 1: 

75 return None 

76 start, end = self.ranges[0] 

77 if end is None: 

78 end = length 

79 if start < 0: 

80 start += length 

81 if http.is_byte_range_valid(start, end, length): 

82 return start, min(end, length) 

83 return None 

84 

85 def make_content_range(self, length: int | None) -> ContentRange | None: 

86 """Creates a :class:`~werkzeug.datastructures.ContentRange` object 

87 from the current range and given content length. 

88 """ 

89 rng = self.range_for_length(length) 

90 if rng is not None: 

91 return ContentRange(self.units, rng[0], rng[1], length) 

92 return None 

93 

94 def to_header(self) -> str: 

95 """Converts the object back into an HTTP header.""" 

96 ranges = [] 

97 for begin, end in self.ranges: 

98 if end is None: 

99 ranges.append(f"{begin}-" if begin >= 0 else str(begin)) 

100 else: 

101 ranges.append(f"{begin}-{end - 1}") 

102 return f"{self.units}={','.join(ranges)}" 

103 

104 def to_content_range_header(self, length: int | None) -> str | None: 

105 """Converts the object into `Content-Range` HTTP header, 

106 based on given length 

107 """ 

108 range = self.range_for_length(length) 

109 if range is not None: 

110 return f"{self.units} {range[0]}-{range[1] - 1}/{length}" 

111 return None 

112 

113 def __str__(self) -> str: 

114 return self.to_header() 

115 

116 def __repr__(self) -> str: 

117 return f"<{type(self).__name__} {str(self)!r}>" 

118 

119 

120class _CallbackProperty(t.Generic[T]): 

121 def __set_name__(self, owner: type[ContentRange], name: str) -> None: 

122 self.attr = f"_{name}" 

123 

124 @t.overload 

125 def __get__(self, instance: None, owner: None) -> te.Self: ... 

126 @t.overload 

127 def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ... 

128 def __get__( 

129 self, instance: ContentRange | None, owner: type[ContentRange] | None 

130 ) -> te.Self | T: 

131 if instance is None: 

132 return self 

133 

134 return instance.__dict__[self.attr] # type: ignore[no-any-return] 

135 

136 def __set__(self, instance: ContentRange, value: T) -> None: 

137 instance.__dict__[self.attr] = value 

138 

139 if instance.on_update is not None: 

140 instance.on_update(instance) 

141 

142 

143class ContentRange: 

144 """Represents the content range header. 

145 

146 .. versionadded:: 0.7 

147 """ 

148 

149 def __init__( 

150 self, 

151 units: str | None, 

152 start: int | None, 

153 stop: int | None, 

154 length: int | None = None, 

155 on_update: cabc.Callable[[ContentRange], None] | None = None, 

156 ) -> None: 

157 self.on_update = on_update 

158 self.set(start, stop, length, units) 

159 

160 #: The units to use, usually "bytes" 

161 units: str | None = _CallbackProperty() # type: ignore[assignment] 

162 #: The start point of the range or `None`. 

163 start: int | None = _CallbackProperty() # type: ignore[assignment] 

164 #: The stop point of the range (non-inclusive) or `None`. Can only be 

165 #: `None` if also start is `None`. 

166 stop: int | None = _CallbackProperty() # type: ignore[assignment] 

167 #: The length of the range or `None`. 

168 length: int | None = _CallbackProperty() # type: ignore[assignment] 

169 

170 def set( 

171 self, 

172 start: int | None, 

173 stop: int | None, 

174 length: int | None = None, 

175 units: str | None = "bytes", 

176 ) -> None: 

177 """Simple method to update the ranges.""" 

178 assert http.is_byte_range_valid(start, stop, length), "Bad range provided" 

179 self._units: str | None = units 

180 self._start: int | None = start 

181 self._stop: int | None = stop 

182 self._length: int | None = length 

183 if self.on_update is not None: 

184 self.on_update(self) 

185 

186 def unset(self) -> None: 

187 """Sets the units to `None` which indicates that the header should 

188 no longer be used. 

189 """ 

190 self.set(None, None, units=None) 

191 

192 def to_header(self) -> str: 

193 if self._units is None: 

194 return "" 

195 if self._length is None: 

196 length: str | int = "*" 

197 else: 

198 length = self._length 

199 if self._start is None: 

200 return f"{self._units} */{length}" 

201 return f"{self._units} {self._start}-{self._stop - 1}/{length}" # type: ignore[operator] 

202 

203 def __bool__(self) -> bool: 

204 return self._units is not None 

205 

206 def __str__(self) -> str: 

207 return self.to_header() 

208 

209 def __repr__(self) -> str: 

210 return f"<{type(self).__name__} {str(self)!r}>" 

211 

212 

213# circular dependencies 

214from .. import http