Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py: 28%

290 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import json 

2import typing 

3import typing as t 

4import warnings 

5from http import HTTPStatus 

6 

7from .._internal import _to_bytes 

8from ..datastructures import Headers 

9from ..http import remove_entity_headers 

10from ..sansio.response import Response as _SansIOResponse 

11from ..urls import iri_to_uri 

12from ..urls import url_join 

13from ..utils import cached_property 

14from ..wsgi import ClosingIterator 

15from ..wsgi import get_current_url 

16from werkzeug._internal import _get_environ 

17from werkzeug.http import generate_etag 

18from werkzeug.http import http_date 

19from werkzeug.http import is_resource_modified 

20from werkzeug.http import parse_etags 

21from werkzeug.http import parse_range_header 

22from werkzeug.wsgi import _RangeWrapper 

23 

24if t.TYPE_CHECKING: 

25 import typing_extensions as te 

26 from _typeshed.wsgi import StartResponse 

27 from _typeshed.wsgi import WSGIApplication 

28 from _typeshed.wsgi import WSGIEnvironment 

29 from .request import Request 

30 

31 

32def _warn_if_string(iterable: t.Iterable) -> None: 

33 """Helper for the response objects to check if the iterable returned 

34 to the WSGI server is not a string. 

35 """ 

36 if isinstance(iterable, str): 

37 warnings.warn( 

38 "Response iterable was set to a string. This will appear to" 

39 " work but means that the server will send the data to the" 

40 " client one character at a time. This is almost never" 

41 " intended behavior, use 'response.data' to assign strings" 

42 " to the response object.", 

43 stacklevel=2, 

44 ) 

45 

46 

47def _iter_encoded( 

48 iterable: t.Iterable[t.Union[str, bytes]], charset: str 

49) -> t.Iterator[bytes]: 

50 for item in iterable: 

51 if isinstance(item, str): 

52 yield item.encode(charset) 

53 else: 

54 yield item 

55 

56 

57def _clean_accept_ranges(accept_ranges: t.Union[bool, str]) -> str: 

58 if accept_ranges is True: 

59 return "bytes" 

60 elif accept_ranges is False: 

61 return "none" 

62 elif isinstance(accept_ranges, str): 

63 return accept_ranges 

64 raise ValueError("Invalid accept_ranges value") 

65 

66 

67class Response(_SansIOResponse): 

68 """Represents an outgoing WSGI HTTP response with body, status, and 

69 headers. Has properties and methods for using the functionality 

70 defined by various HTTP specs. 

71 

72 The response body is flexible to support different use cases. The 

73 simple form is passing bytes, or a string which will be encoded as 

74 UTF-8. Passing an iterable of bytes or strings makes this a 

75 streaming response. A generator is particularly useful for building 

76 a CSV file in memory or using SSE (Server Sent Events). A file-like 

77 object is also iterable, although the 

78 :func:`~werkzeug.utils.send_file` helper should be used in that 

79 case. 

80 

81 The response object is itself a WSGI application callable. When 

82 called (:meth:`__call__`) with ``environ`` and ``start_response``, 

83 it will pass its status and headers to ``start_response`` then 

84 return its body as an iterable. 

85 

86 .. code-block:: python 

87 

88 from werkzeug.wrappers.response import Response 

89 

90 def index(): 

91 return Response("Hello, World!") 

92 

93 def application(environ, start_response): 

94 path = environ.get("PATH_INFO") or "/" 

95 

96 if path == "/": 

97 response = index() 

98 else: 

99 response = Response("Not Found", status=404) 

100 

101 return response(environ, start_response) 

102 

103 :param response: The data for the body of the response. A string or 

104 bytes, or tuple or list of strings or bytes, for a fixed-length 

105 response, or any other iterable of strings or bytes for a 

106 streaming response. Defaults to an empty body. 

107 :param status: The status code for the response. Either an int, in 

108 which case the default status message is added, or a string in 

109 the form ``{code} {message}``, like ``404 Not Found``. Defaults 

110 to 200. 

111 :param headers: A :class:`~werkzeug.datastructures.Headers` object, 

112 or a list of ``(key, value)`` tuples that will be converted to a 

113 ``Headers`` object. 

114 :param mimetype: The mime type (content type without charset or 

115 other parameters) of the response. If the value starts with 

116 ``text/`` (or matches some other special cases), the charset 

117 will be added to create the ``content_type``. 

118 :param content_type: The full content type of the response. 

119 Overrides building the value from ``mimetype``. 

120 :param direct_passthrough: Pass the response body directly through 

121 as the WSGI iterable. This can be used when the body is a binary 

122 file or other iterator of bytes, to skip some unnecessary 

123 checks. Use :func:`~werkzeug.utils.send_file` instead of setting 

124 this manually. 

125 

126 .. versionchanged:: 2.0 

127 Combine ``BaseResponse`` and mixins into a single ``Response`` 

128 class. Using the old classes is deprecated and will be removed 

129 in Werkzeug 2.1. 

130 

131 .. versionchanged:: 0.5 

132 The ``direct_passthrough`` parameter was added. 

133 """ 

134 

135 #: if set to `False` accessing properties on the response object will 

136 #: not try to consume the response iterator and convert it into a list. 

137 #: 

138 #: .. versionadded:: 0.6.2 

139 #: 

140 #: That attribute was previously called `implicit_seqence_conversion`. 

141 #: (Notice the typo). If you did use this feature, you have to adapt 

142 #: your code to the name change. 

143 implicit_sequence_conversion = True 

144 

145 #: If a redirect ``Location`` header is a relative URL, make it an 

146 #: absolute URL, including scheme and domain. 

147 #: 

148 #: .. versionchanged:: 2.1 

149 #: This is disabled by default, so responses will send relative 

150 #: redirects. 

151 #: 

152 #: .. versionadded:: 0.8 

153 autocorrect_location_header = False 

154 

155 #: Should this response object automatically set the content-length 

156 #: header if possible? This is true by default. 

157 #: 

158 #: .. versionadded:: 0.8 

159 automatically_set_content_length = True 

160 

161 #: The response body to send as the WSGI iterable. A list of strings 

162 #: or bytes represents a fixed-length response, any other iterable 

163 #: is a streaming response. Strings are encoded to bytes as UTF-8. 

164 #: 

165 #: Do not set to a plain string or bytes, that will cause sending 

166 #: the response to be very inefficient as it will iterate one byte 

167 #: at a time. 

168 response: t.Union[t.Iterable[str], t.Iterable[bytes]] 

169 

170 def __init__( 

171 self, 

172 response: t.Optional[ 

173 t.Union[t.Iterable[bytes], bytes, t.Iterable[str], str] 

174 ] = None, 

175 status: t.Optional[t.Union[int, str, HTTPStatus]] = None, 

176 headers: t.Optional[ 

177 t.Union[ 

178 t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]], 

179 t.Iterable[t.Tuple[str, t.Union[str, int]]], 

180 ] 

181 ] = None, 

182 mimetype: t.Optional[str] = None, 

183 content_type: t.Optional[str] = None, 

184 direct_passthrough: bool = False, 

185 ) -> None: 

186 super().__init__( 

187 status=status, 

188 headers=headers, 

189 mimetype=mimetype, 

190 content_type=content_type, 

191 ) 

192 

193 #: Pass the response body directly through as the WSGI iterable. 

194 #: This can be used when the body is a binary file or other 

195 #: iterator of bytes, to skip some unnecessary checks. Use 

196 #: :func:`~werkzeug.utils.send_file` instead of setting this 

197 #: manually. 

198 self.direct_passthrough = direct_passthrough 

199 self._on_close: t.List[t.Callable[[], t.Any]] = [] 

200 

201 # we set the response after the headers so that if a class changes 

202 # the charset attribute, the data is set in the correct charset. 

203 if response is None: 

204 self.response = [] 

205 elif isinstance(response, (str, bytes, bytearray)): 

206 self.set_data(response) 

207 else: 

208 self.response = response 

209 

210 def call_on_close(self, func: t.Callable[[], t.Any]) -> t.Callable[[], t.Any]: 

211 """Adds a function to the internal list of functions that should 

212 be called as part of closing down the response. Since 0.7 this 

213 function also returns the function that was passed so that this 

214 can be used as a decorator. 

215 

216 .. versionadded:: 0.6 

217 """ 

218 self._on_close.append(func) 

219 return func 

220 

221 def __repr__(self) -> str: 

222 if self.is_sequence: 

223 body_info = f"{sum(map(len, self.iter_encoded()))} bytes" 

224 else: 

225 body_info = "streamed" if self.is_streamed else "likely-streamed" 

226 return f"<{type(self).__name__} {body_info} [{self.status}]>" 

227 

228 @classmethod 

229 def force_type( 

230 cls, response: "Response", environ: t.Optional["WSGIEnvironment"] = None 

231 ) -> "Response": 

232 """Enforce that the WSGI response is a response object of the current 

233 type. Werkzeug will use the :class:`Response` internally in many 

234 situations like the exceptions. If you call :meth:`get_response` on an 

235 exception you will get back a regular :class:`Response` object, even 

236 if you are using a custom subclass. 

237 

238 This method can enforce a given response type, and it will also 

239 convert arbitrary WSGI callables into response objects if an environ 

240 is provided:: 

241 

242 # convert a Werkzeug response object into an instance of the 

243 # MyResponseClass subclass. 

244 response = MyResponseClass.force_type(response) 

245 

246 # convert any WSGI application into a response object 

247 response = MyResponseClass.force_type(response, environ) 

248 

249 This is especially useful if you want to post-process responses in 

250 the main dispatcher and use functionality provided by your subclass. 

251 

252 Keep in mind that this will modify response objects in place if 

253 possible! 

254 

255 :param response: a response object or wsgi application. 

256 :param environ: a WSGI environment object. 

257 :return: a response object. 

258 """ 

259 if not isinstance(response, Response): 

260 if environ is None: 

261 raise TypeError( 

262 "cannot convert WSGI application into response" 

263 " objects without an environ" 

264 ) 

265 

266 from ..test import run_wsgi_app 

267 

268 response = Response(*run_wsgi_app(response, environ)) 

269 

270 response.__class__ = cls 

271 return response 

272 

273 @classmethod 

274 def from_app( 

275 cls, app: "WSGIApplication", environ: "WSGIEnvironment", buffered: bool = False 

276 ) -> "Response": 

277 """Create a new response object from an application output. This 

278 works best if you pass it an application that returns a generator all 

279 the time. Sometimes applications may use the `write()` callable 

280 returned by the `start_response` function. This tries to resolve such 

281 edge cases automatically. But if you don't get the expected output 

282 you should set `buffered` to `True` which enforces buffering. 

283 

284 :param app: the WSGI application to execute. 

285 :param environ: the WSGI environment to execute against. 

286 :param buffered: set to `True` to enforce buffering. 

287 :return: a response object. 

288 """ 

289 from ..test import run_wsgi_app 

290 

291 return cls(*run_wsgi_app(app, environ, buffered)) 

292 

293 @typing.overload 

294 def get_data(self, as_text: "te.Literal[False]" = False) -> bytes: 

295 ... 

296 

297 @typing.overload 

298 def get_data(self, as_text: "te.Literal[True]") -> str: 

299 ... 

300 

301 def get_data(self, as_text: bool = False) -> t.Union[bytes, str]: 

302 """The string representation of the response body. Whenever you call 

303 this property the response iterable is encoded and flattened. This 

304 can lead to unwanted behavior if you stream big data. 

305 

306 This behavior can be disabled by setting 

307 :attr:`implicit_sequence_conversion` to `False`. 

308 

309 If `as_text` is set to `True` the return value will be a decoded 

310 string. 

311 

312 .. versionadded:: 0.9 

313 """ 

314 self._ensure_sequence() 

315 rv = b"".join(self.iter_encoded()) 

316 

317 if as_text: 

318 return rv.decode(self.charset) 

319 

320 return rv 

321 

322 def set_data(self, value: t.Union[bytes, str]) -> None: 

323 """Sets a new string as response. The value must be a string or 

324 bytes. If a string is set it's encoded to the charset of the 

325 response (utf-8 by default). 

326 

327 .. versionadded:: 0.9 

328 """ 

329 # if a string is set, it's encoded directly so that we 

330 # can set the content length 

331 if isinstance(value, str): 

332 value = value.encode(self.charset) 

333 else: 

334 value = bytes(value) 

335 self.response = [value] 

336 if self.automatically_set_content_length: 

337 self.headers["Content-Length"] = str(len(value)) 

338 

339 data = property( 

340 get_data, 

341 set_data, 

342 doc="A descriptor that calls :meth:`get_data` and :meth:`set_data`.", 

343 ) 

344 

345 def calculate_content_length(self) -> t.Optional[int]: 

346 """Returns the content length if available or `None` otherwise.""" 

347 try: 

348 self._ensure_sequence() 

349 except RuntimeError: 

350 return None 

351 return sum(len(x) for x in self.iter_encoded()) 

352 

353 def _ensure_sequence(self, mutable: bool = False) -> None: 

354 """This method can be called by methods that need a sequence. If 

355 `mutable` is true, it will also ensure that the response sequence 

356 is a standard Python list. 

357 

358 .. versionadded:: 0.6 

359 """ 

360 if self.is_sequence: 

361 # if we need a mutable object, we ensure it's a list. 

362 if mutable and not isinstance(self.response, list): 

363 self.response = list(self.response) # type: ignore 

364 return 

365 if self.direct_passthrough: 

366 raise RuntimeError( 

367 "Attempted implicit sequence conversion but the" 

368 " response object is in direct passthrough mode." 

369 ) 

370 if not self.implicit_sequence_conversion: 

371 raise RuntimeError( 

372 "The response object required the iterable to be a" 

373 " sequence, but the implicit conversion was disabled." 

374 " Call make_sequence() yourself." 

375 ) 

376 self.make_sequence() 

377 

378 def make_sequence(self) -> None: 

379 """Converts the response iterator in a list. By default this happens 

380 automatically if required. If `implicit_sequence_conversion` is 

381 disabled, this method is not automatically called and some properties 

382 might raise exceptions. This also encodes all the items. 

383 

384 .. versionadded:: 0.6 

385 """ 

386 if not self.is_sequence: 

387 # if we consume an iterable we have to ensure that the close 

388 # method of the iterable is called if available when we tear 

389 # down the response 

390 close = getattr(self.response, "close", None) 

391 self.response = list(self.iter_encoded()) 

392 if close is not None: 

393 self.call_on_close(close) 

394 

395 def iter_encoded(self) -> t.Iterator[bytes]: 

396 """Iter the response encoded with the encoding of the response. 

397 If the response object is invoked as WSGI application the return 

398 value of this method is used as application iterator unless 

399 :attr:`direct_passthrough` was activated. 

400 """ 

401 if __debug__: 

402 _warn_if_string(self.response) 

403 # Encode in a separate function so that self.response is fetched 

404 # early. This allows us to wrap the response with the return 

405 # value from get_app_iter or iter_encoded. 

406 return _iter_encoded(self.response, self.charset) 

407 

408 @property 

409 def is_streamed(self) -> bool: 

410 """If the response is streamed (the response is not an iterable with 

411 a length information) this property is `True`. In this case streamed 

412 means that there is no information about the number of iterations. 

413 This is usually `True` if a generator is passed to the response object. 

414 

415 This is useful for checking before applying some sort of post 

416 filtering that should not take place for streamed responses. 

417 """ 

418 try: 

419 len(self.response) # type: ignore 

420 except (TypeError, AttributeError): 

421 return True 

422 return False 

423 

424 @property 

425 def is_sequence(self) -> bool: 

426 """If the iterator is buffered, this property will be `True`. A 

427 response object will consider an iterator to be buffered if the 

428 response attribute is a list or tuple. 

429 

430 .. versionadded:: 0.6 

431 """ 

432 return isinstance(self.response, (tuple, list)) 

433 

434 def close(self) -> None: 

435 """Close the wrapped response if possible. You can also use the object 

436 in a with statement which will automatically close it. 

437 

438 .. versionadded:: 0.9 

439 Can now be used in a with statement. 

440 """ 

441 if hasattr(self.response, "close"): 

442 self.response.close() # type: ignore 

443 for func in self._on_close: 

444 func() 

445 

446 def __enter__(self) -> "Response": 

447 return self 

448 

449 def __exit__(self, exc_type, exc_value, tb): # type: ignore 

450 self.close() 

451 

452 def freeze(self) -> None: 

453 """Make the response object ready to be pickled. Does the 

454 following: 

455 

456 * Buffer the response into a list, ignoring 

457 :attr:`implicity_sequence_conversion` and 

458 :attr:`direct_passthrough`. 

459 * Set the ``Content-Length`` header. 

460 * Generate an ``ETag`` header if one is not already set. 

461 

462 .. versionchanged:: 2.1 

463 Removed the ``no_etag`` parameter. 

464 

465 .. versionchanged:: 2.0 

466 An ``ETag`` header is added, the ``no_etag`` parameter is 

467 deprecated and will be removed in Werkzeug 2.1. 

468 

469 .. versionchanged:: 0.6 

470 The ``Content-Length`` header is set. 

471 """ 

472 # Always freeze the encoded response body, ignore 

473 # implicit_sequence_conversion and direct_passthrough. 

474 self.response = list(self.iter_encoded()) 

475 self.headers["Content-Length"] = str(sum(map(len, self.response))) 

476 self.add_etag() 

477 

478 def get_wsgi_headers(self, environ: "WSGIEnvironment") -> Headers: 

479 """This is automatically called right before the response is started 

480 and returns headers modified for the given environment. It returns a 

481 copy of the headers from the response with some modifications applied 

482 if necessary. 

483 

484 For example the location header (if present) is joined with the root 

485 URL of the environment. Also the content length is automatically set 

486 to zero here for certain status codes. 

487 

488 .. versionchanged:: 0.6 

489 Previously that function was called `fix_headers` and modified 

490 the response object in place. Also since 0.6, IRIs in location 

491 and content-location headers are handled properly. 

492 

493 Also starting with 0.6, Werkzeug will attempt to set the content 

494 length if it is able to figure it out on its own. This is the 

495 case if all the strings in the response iterable are already 

496 encoded and the iterable is buffered. 

497 

498 :param environ: the WSGI environment of the request. 

499 :return: returns a new :class:`~werkzeug.datastructures.Headers` 

500 object. 

501 """ 

502 headers = Headers(self.headers) 

503 location: t.Optional[str] = None 

504 content_location: t.Optional[str] = None 

505 content_length: t.Optional[t.Union[str, int]] = None 

506 status = self.status_code 

507 

508 # iterate over the headers to find all values in one go. Because 

509 # get_wsgi_headers is used each response that gives us a tiny 

510 # speedup. 

511 for key, value in headers: 

512 ikey = key.lower() 

513 if ikey == "location": 

514 location = value 

515 elif ikey == "content-location": 

516 content_location = value 

517 elif ikey == "content-length": 

518 content_length = value 

519 

520 # make sure the location header is an absolute URL 

521 if location is not None: 

522 old_location = location 

523 if isinstance(location, str): 

524 # Safe conversion is necessary here as we might redirect 

525 # to a broken URI scheme (for instance itms-services). 

526 location = iri_to_uri(location, safe_conversion=True) 

527 

528 if self.autocorrect_location_header: 

529 current_url = get_current_url(environ, strip_querystring=True) 

530 if isinstance(current_url, str): 

531 current_url = iri_to_uri(current_url) 

532 location = url_join(current_url, location) 

533 if location != old_location: 

534 headers["Location"] = location 

535 

536 # make sure the content location is a URL 

537 if content_location is not None and isinstance(content_location, str): 

538 headers["Content-Location"] = iri_to_uri(content_location) 

539 

540 if 100 <= status < 200 or status == 204: 

541 # Per section 3.3.2 of RFC 7230, "a server MUST NOT send a 

542 # Content-Length header field in any response with a status 

543 # code of 1xx (Informational) or 204 (No Content)." 

544 headers.remove("Content-Length") 

545 elif status == 304: 

546 remove_entity_headers(headers) 

547 

548 # if we can determine the content length automatically, we 

549 # should try to do that. But only if this does not involve 

550 # flattening the iterator or encoding of strings in the 

551 # response. We however should not do that if we have a 304 

552 # response. 

553 if ( 

554 self.automatically_set_content_length 

555 and self.is_sequence 

556 and content_length is None 

557 and status not in (204, 304) 

558 and not (100 <= status < 200) 

559 ): 

560 try: 

561 content_length = sum(len(_to_bytes(x, "ascii")) for x in self.response) 

562 except UnicodeError: 

563 # Something other than bytes, can't safely figure out 

564 # the length of the response. 

565 pass 

566 else: 

567 headers["Content-Length"] = str(content_length) 

568 

569 return headers 

570 

571 def get_app_iter(self, environ: "WSGIEnvironment") -> t.Iterable[bytes]: 

572 """Returns the application iterator for the given environ. Depending 

573 on the request method and the current status code the return value 

574 might be an empty response rather than the one from the response. 

575 

576 If the request method is `HEAD` or the status code is in a range 

577 where the HTTP specification requires an empty response, an empty 

578 iterable is returned. 

579 

580 .. versionadded:: 0.6 

581 

582 :param environ: the WSGI environment of the request. 

583 :return: a response iterable. 

584 """ 

585 status = self.status_code 

586 if ( 

587 environ["REQUEST_METHOD"] == "HEAD" 

588 or 100 <= status < 200 

589 or status in (204, 304) 

590 ): 

591 iterable: t.Iterable[bytes] = () 

592 elif self.direct_passthrough: 

593 if __debug__: 

594 _warn_if_string(self.response) 

595 return self.response # type: ignore 

596 else: 

597 iterable = self.iter_encoded() 

598 return ClosingIterator(iterable, self.close) 

599 

600 def get_wsgi_response( 

601 self, environ: "WSGIEnvironment" 

602 ) -> t.Tuple[t.Iterable[bytes], str, t.List[t.Tuple[str, str]]]: 

603 """Returns the final WSGI response as tuple. The first item in 

604 the tuple is the application iterator, the second the status and 

605 the third the list of headers. The response returned is created 

606 specially for the given environment. For example if the request 

607 method in the WSGI environment is ``'HEAD'`` the response will 

608 be empty and only the headers and status code will be present. 

609 

610 .. versionadded:: 0.6 

611 

612 :param environ: the WSGI environment of the request. 

613 :return: an ``(app_iter, status, headers)`` tuple. 

614 """ 

615 headers = self.get_wsgi_headers(environ) 

616 app_iter = self.get_app_iter(environ) 

617 return app_iter, self.status, headers.to_wsgi_list() 

618 

619 def __call__( 

620 self, environ: "WSGIEnvironment", start_response: "StartResponse" 

621 ) -> t.Iterable[bytes]: 

622 """Process this response as WSGI application. 

623 

624 :param environ: the WSGI environment. 

625 :param start_response: the response callable provided by the WSGI 

626 server. 

627 :return: an application iterator 

628 """ 

629 app_iter, status, headers = self.get_wsgi_response(environ) 

630 start_response(status, headers) 

631 return app_iter 

632 

633 # JSON 

634 

635 #: A module or other object that has ``dumps`` and ``loads`` 

636 #: functions that match the API of the built-in :mod:`json` module. 

637 json_module = json 

638 

639 @property 

640 def json(self) -> t.Optional[t.Any]: 

641 """The parsed JSON data if :attr:`mimetype` indicates JSON 

642 (:mimetype:`application/json`, see :attr:`is_json`). 

643 

644 Calls :meth:`get_json` with default arguments. 

645 """ 

646 return self.get_json() 

647 

648 def get_json(self, force: bool = False, silent: bool = False) -> t.Optional[t.Any]: 

649 """Parse :attr:`data` as JSON. Useful during testing. 

650 

651 If the mimetype does not indicate JSON 

652 (:mimetype:`application/json`, see :attr:`is_json`), this 

653 returns ``None``. 

654 

655 Unlike :meth:`Request.get_json`, the result is not cached. 

656 

657 :param force: Ignore the mimetype and always try to parse JSON. 

658 :param silent: Silence parsing errors and return ``None`` 

659 instead. 

660 """ 

661 if not (force or self.is_json): 

662 return None 

663 

664 data = self.get_data() 

665 

666 try: 

667 return self.json_module.loads(data) 

668 except ValueError: 

669 if not silent: 

670 raise 

671 

672 return None 

673 

674 # Stream 

675 

676 @cached_property 

677 def stream(self) -> "ResponseStream": 

678 """The response iterable as write-only stream.""" 

679 return ResponseStream(self) 

680 

681 def _wrap_range_response(self, start: int, length: int) -> None: 

682 """Wrap existing Response in case of Range Request context.""" 

683 if self.status_code == 206: 

684 self.response = _RangeWrapper(self.response, start, length) # type: ignore 

685 

686 def _is_range_request_processable(self, environ: "WSGIEnvironment") -> bool: 

687 """Return ``True`` if `Range` header is present and if underlying 

688 resource is considered unchanged when compared with `If-Range` header. 

689 """ 

690 return ( 

691 "HTTP_IF_RANGE" not in environ 

692 or not is_resource_modified( 

693 environ, 

694 self.headers.get("etag"), 

695 None, 

696 self.headers.get("last-modified"), 

697 ignore_if_range=False, 

698 ) 

699 ) and "HTTP_RANGE" in environ 

700 

701 def _process_range_request( 

702 self, 

703 environ: "WSGIEnvironment", 

704 complete_length: t.Optional[int] = None, 

705 accept_ranges: t.Optional[t.Union[bool, str]] = None, 

706 ) -> bool: 

707 """Handle Range Request related headers (RFC7233). If `Accept-Ranges` 

708 header is valid, and Range Request is processable, we set the headers 

709 as described by the RFC, and wrap the underlying response in a 

710 RangeWrapper. 

711 

712 Returns ``True`` if Range Request can be fulfilled, ``False`` otherwise. 

713 

714 :raises: :class:`~werkzeug.exceptions.RequestedRangeNotSatisfiable` 

715 if `Range` header could not be parsed or satisfied. 

716 

717 .. versionchanged:: 2.0 

718 Returns ``False`` if the length is 0. 

719 """ 

720 from ..exceptions import RequestedRangeNotSatisfiable 

721 

722 if ( 

723 accept_ranges is None 

724 or complete_length is None 

725 or complete_length == 0 

726 or not self._is_range_request_processable(environ) 

727 ): 

728 return False 

729 

730 parsed_range = parse_range_header(environ.get("HTTP_RANGE")) 

731 

732 if parsed_range is None: 

733 raise RequestedRangeNotSatisfiable(complete_length) 

734 

735 range_tuple = parsed_range.range_for_length(complete_length) 

736 content_range_header = parsed_range.to_content_range_header(complete_length) 

737 

738 if range_tuple is None or content_range_header is None: 

739 raise RequestedRangeNotSatisfiable(complete_length) 

740 

741 content_length = range_tuple[1] - range_tuple[0] 

742 self.headers["Content-Length"] = content_length 

743 self.headers["Accept-Ranges"] = accept_ranges 

744 self.content_range = content_range_header # type: ignore 

745 self.status_code = 206 

746 self._wrap_range_response(range_tuple[0], content_length) 

747 return True 

748 

749 def make_conditional( 

750 self, 

751 request_or_environ: t.Union["WSGIEnvironment", "Request"], 

752 accept_ranges: t.Union[bool, str] = False, 

753 complete_length: t.Optional[int] = None, 

754 ) -> "Response": 

755 """Make the response conditional to the request. This method works 

756 best if an etag was defined for the response already. The `add_etag` 

757 method can be used to do that. If called without etag just the date 

758 header is set. 

759 

760 This does nothing if the request method in the request or environ is 

761 anything but GET or HEAD. 

762 

763 For optimal performance when handling range requests, it's recommended 

764 that your response data object implements `seekable`, `seek` and `tell` 

765 methods as described by :py:class:`io.IOBase`. Objects returned by 

766 :meth:`~werkzeug.wsgi.wrap_file` automatically implement those methods. 

767 

768 It does not remove the body of the response because that's something 

769 the :meth:`__call__` function does for us automatically. 

770 

771 Returns self so that you can do ``return resp.make_conditional(req)`` 

772 but modifies the object in-place. 

773 

774 :param request_or_environ: a request object or WSGI environment to be 

775 used to make the response conditional 

776 against. 

777 :param accept_ranges: This parameter dictates the value of 

778 `Accept-Ranges` header. If ``False`` (default), 

779 the header is not set. If ``True``, it will be set 

780 to ``"bytes"``. If ``None``, it will be set to 

781 ``"none"``. If it's a string, it will use this 

782 value. 

783 :param complete_length: Will be used only in valid Range Requests. 

784 It will set `Content-Range` complete length 

785 value and compute `Content-Length` real value. 

786 This parameter is mandatory for successful 

787 Range Requests completion. 

788 :raises: :class:`~werkzeug.exceptions.RequestedRangeNotSatisfiable` 

789 if `Range` header could not be parsed or satisfied. 

790 

791 .. versionchanged:: 2.0 

792 Range processing is skipped if length is 0 instead of 

793 raising a 416 Range Not Satisfiable error. 

794 """ 

795 environ = _get_environ(request_or_environ) 

796 if environ["REQUEST_METHOD"] in ("GET", "HEAD"): 

797 # if the date is not in the headers, add it now. We however 

798 # will not override an already existing header. Unfortunately 

799 # this header will be overridden by many WSGI servers including 

800 # wsgiref. 

801 if "date" not in self.headers: 

802 self.headers["Date"] = http_date() 

803 accept_ranges = _clean_accept_ranges(accept_ranges) 

804 is206 = self._process_range_request(environ, complete_length, accept_ranges) 

805 if not is206 and not is_resource_modified( 

806 environ, 

807 self.headers.get("etag"), 

808 None, 

809 self.headers.get("last-modified"), 

810 ): 

811 if parse_etags(environ.get("HTTP_IF_MATCH")): 

812 self.status_code = 412 

813 else: 

814 self.status_code = 304 

815 if ( 

816 self.automatically_set_content_length 

817 and "content-length" not in self.headers 

818 ): 

819 length = self.calculate_content_length() 

820 if length is not None: 

821 self.headers["Content-Length"] = length 

822 return self 

823 

824 def add_etag(self, overwrite: bool = False, weak: bool = False) -> None: 

825 """Add an etag for the current response if there is none yet. 

826 

827 .. versionchanged:: 2.0 

828 SHA-1 is used to generate the value. MD5 may not be 

829 available in some environments. 

830 """ 

831 if overwrite or "etag" not in self.headers: 

832 self.set_etag(generate_etag(self.get_data()), weak) 

833 

834 

835class ResponseStream: 

836 """A file descriptor like object used by :meth:`Response.stream` to 

837 represent the body of the stream. It directly pushes into the 

838 response iterable of the response object. 

839 """ 

840 

841 mode = "wb+" 

842 

843 def __init__(self, response: Response): 

844 self.response = response 

845 self.closed = False 

846 

847 def write(self, value: bytes) -> int: 

848 if self.closed: 

849 raise ValueError("I/O operation on closed file") 

850 self.response._ensure_sequence(mutable=True) 

851 self.response.response.append(value) # type: ignore 

852 self.response.headers.pop("Content-Length", None) 

853 return len(value) 

854 

855 def writelines(self, seq: t.Iterable[bytes]) -> None: 

856 for item in seq: 

857 self.write(item) 

858 

859 def close(self) -> None: 

860 self.closed = True 

861 

862 def flush(self) -> None: 

863 if self.closed: 

864 raise ValueError("I/O operation on closed file") 

865 

866 def isatty(self) -> bool: 

867 if self.closed: 

868 raise ValueError("I/O operation on closed file") 

869 return False 

870 

871 def tell(self) -> int: 

872 self.response._ensure_sequence() 

873 return sum(map(len, self.response.response)) 

874 

875 @property 

876 def encoding(self) -> str: 

877 return self.response.charset