Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/download.py: 5%
149 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Support for downloading media from Google APIs."""
17import urllib3.response # type: ignore
18import http
20from google.resumable_media import _download
21from google.resumable_media import common
22from google.resumable_media import _helpers
23from google.resumable_media.requests import _request_helpers
26_CHECKSUM_MISMATCH = """\
27Checksum mismatch while downloading:
29 {}
31The X-Goog-Hash header indicated an {checksum_type} checksum of:
33 {}
35but the actual {checksum_type} checksum of the downloaded contents was:
37 {}
38"""
40_STREAM_SEEK_ERROR = """\
41Incomplete download for:
42{}
43Error writing to stream while handling a gzip-compressed file download.
44Please restart the download.
45"""
48class Download(_request_helpers.RequestsMixin, _download.Download):
49 """Helper to manage downloading a resource from a Google API.
51 "Slices" of the resource can be retrieved by specifying a range
52 with ``start`` and / or ``end``. However, in typical usage, neither
53 ``start`` nor ``end`` is expected to be provided.
55 Args:
56 media_url (str): The URL containing the media to be downloaded.
57 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
58 the downloaded resource can be written to.
59 start (int): The first byte in a range to be downloaded. If not
60 provided, but ``end`` is provided, will download from the
61 beginning to ``end`` of the media.
62 end (int): The last byte in a range to be downloaded. If not
63 provided, but ``start`` is provided, will download from the
64 ``start`` to the end of the media.
65 headers (Optional[Mapping[str, str]]): Extra headers that should
66 be sent with the request, e.g. headers for encrypted data.
67 checksum Optional([str]): The type of checksum to compute to verify
68 the integrity of the object. The response headers must contain
69 a checksum of the requested type. If the headers lack an
70 appropriate checksum (for instance in the case of transcoded or
71 ranged downloads where the remote service does not know the
72 correct checksum) an INFO-level log will be emitted. Supported
73 values are "md5", "crc32c" and None. The default is "md5".
75 Attributes:
76 media_url (str): The URL containing the media to be downloaded.
77 start (Optional[int]): The first byte in a range to be downloaded.
78 end (Optional[int]): The last byte in a range to be downloaded.
79 """
81 def _write_to_stream(self, response):
82 """Write response body to a write-able stream.
84 .. note:
86 This method assumes that the ``_stream`` attribute is set on the
87 current download.
89 Args:
90 response (~requests.Response): The HTTP response object.
92 Raises:
93 ~google.resumable_media.common.DataCorruption: If the download's
94 checksum doesn't agree with server-computed checksum.
95 """
97 # Retrieve the expected checksum only once for the download request,
98 # then compute and validate the checksum when the full download completes.
99 # Retried requests are range requests, and there's no way to detect
100 # data corruption for that byte range alone.
101 if self._expected_checksum is None and self._checksum_object is None:
102 # `_get_expected_checksum()` may return None even if a checksum was
103 # requested, in which case it will emit an info log _MISSING_CHECKSUM.
104 # If an invalid checksum type is specified, this will raise ValueError.
105 expected_checksum, checksum_object = _helpers._get_expected_checksum(
106 response, self._get_headers, self.media_url, checksum_type=self.checksum
107 )
108 self._expected_checksum = expected_checksum
109 self._checksum_object = checksum_object
110 else:
111 expected_checksum = self._expected_checksum
112 checksum_object = self._checksum_object
114 with response:
115 # NOTE: In order to handle compressed streams gracefully, we try
116 # to insert our checksum object into the decompression stream. If
117 # the stream is indeed compressed, this will delegate the checksum
118 # object to the decoder and return a _DoNothingHash here.
119 local_checksum_object = _add_decoder(response.raw, checksum_object)
120 body_iter = response.iter_content(
121 chunk_size=_request_helpers._SINGLE_GET_CHUNK_SIZE, decode_unicode=False
122 )
123 for chunk in body_iter:
124 self._stream.write(chunk)
125 self._bytes_downloaded += len(chunk)
126 local_checksum_object.update(chunk)
128 # Don't validate the checksum for partial responses.
129 if (
130 expected_checksum is not None
131 and response.status_code != http.client.PARTIAL_CONTENT
132 ):
133 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
134 if actual_checksum != expected_checksum:
135 msg = _CHECKSUM_MISMATCH.format(
136 self.media_url,
137 expected_checksum,
138 actual_checksum,
139 checksum_type=self.checksum.upper(),
140 )
141 raise common.DataCorruption(response, msg)
143 def consume(
144 self,
145 transport,
146 timeout=(
147 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
148 _request_helpers._DEFAULT_READ_TIMEOUT,
149 ),
150 ):
151 """Consume the resource to be downloaded.
153 If a ``stream`` is attached to this download, then the downloaded
154 resource will be written to the stream.
156 Args:
157 transport (~requests.Session): A ``requests`` object which can
158 make authenticated requests.
159 timeout (Optional[Union[float, Tuple[float, float]]]):
160 The number of seconds to wait for the server response.
161 Depending on the retry strategy, a request may be repeated
162 several times using the same timeout each time.
164 Can also be passed as a tuple (connect_timeout, read_timeout).
165 See :meth:`requests.Session.request` documentation for details.
167 Returns:
168 ~requests.Response: The HTTP response returned by ``transport``.
170 Raises:
171 ~google.resumable_media.common.DataCorruption: If the download's
172 checksum doesn't agree with server-computed checksum.
173 ValueError: If the current :class:`Download` has already
174 finished.
175 """
176 method, _, payload, headers = self._prepare_request()
177 # NOTE: We assume "payload is None" but pass it along anyway.
178 request_kwargs = {
179 "data": payload,
180 "headers": headers,
181 "timeout": timeout,
182 }
183 if self._stream is not None:
184 request_kwargs["stream"] = True
186 # Assign object generation if generation is specified in the media url.
187 if self._object_generation is None:
188 self._object_generation = _helpers._get_generation_from_url(self.media_url)
190 # Wrap the request business logic in a function to be retried.
191 def retriable_request():
192 url = self.media_url
194 # To restart an interrupted download, read from the offset of last byte
195 # received using a range request, and set object generation query param.
196 if self._bytes_downloaded > 0:
197 _download.add_bytes_range(
198 self._bytes_downloaded, self.end, self._headers
199 )
200 request_kwargs["headers"] = self._headers
202 # Set object generation query param to ensure the same object content is requested.
203 if (
204 self._object_generation is not None
205 and _helpers._get_generation_from_url(self.media_url) is None
206 ):
207 query_param = {"generation": self._object_generation}
208 url = _helpers.add_query_parameters(self.media_url, query_param)
210 result = transport.request(method, url, **request_kwargs)
212 # If a generation hasn't been specified, and this is the first response we get, let's record the
213 # generation. In future requests we'll specify the generation query param to avoid data races.
214 if self._object_generation is None:
215 self._object_generation = _helpers._parse_generation_header(
216 result, self._get_headers
217 )
219 self._process_response(result)
221 # With decompressive transcoding, GCS serves back the whole file regardless of the range request,
222 # thus we reset the stream position to the start of the stream.
223 # See: https://cloud.google.com/storage/docs/transcoding#range
224 if self._stream is not None:
225 if _helpers._is_decompressive_transcoding(result, self._get_headers):
226 try:
227 self._stream.seek(0)
228 except Exception as exc:
229 msg = _STREAM_SEEK_ERROR.format(url)
230 raise Exception(msg) from exc
231 self._bytes_downloaded = 0
233 self._write_to_stream(result)
235 return result
237 return _request_helpers.wait_and_retry(
238 retriable_request, self._get_status_code, self._retry_strategy
239 )
242class RawDownload(_request_helpers.RawRequestsMixin, _download.Download):
243 """Helper to manage downloading a raw resource from a Google API.
245 "Slices" of the resource can be retrieved by specifying a range
246 with ``start`` and / or ``end``. However, in typical usage, neither
247 ``start`` nor ``end`` is expected to be provided.
249 Args:
250 media_url (str): The URL containing the media to be downloaded.
251 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
252 the downloaded resource can be written to.
253 start (int): The first byte in a range to be downloaded. If not
254 provided, but ``end`` is provided, will download from the
255 beginning to ``end`` of the media.
256 end (int): The last byte in a range to be downloaded. If not
257 provided, but ``start`` is provided, will download from the
258 ``start`` to the end of the media.
259 headers (Optional[Mapping[str, str]]): Extra headers that should
260 be sent with the request, e.g. headers for encrypted data.
261 checksum Optional([str]): The type of checksum to compute to verify
262 the integrity of the object. The response headers must contain
263 a checksum of the requested type. If the headers lack an
264 appropriate checksum (for instance in the case of transcoded or
265 ranged downloads where the remote service does not know the
266 correct checksum) an INFO-level log will be emitted. Supported
267 values are "md5", "crc32c" and None. The default is "md5".
268 Attributes:
269 media_url (str): The URL containing the media to be downloaded.
270 start (Optional[int]): The first byte in a range to be downloaded.
271 end (Optional[int]): The last byte in a range to be downloaded.
272 """
274 def _write_to_stream(self, response):
275 """Write response body to a write-able stream.
277 .. note:
279 This method assumes that the ``_stream`` attribute is set on the
280 current download.
282 Args:
283 response (~requests.Response): The HTTP response object.
285 Raises:
286 ~google.resumable_media.common.DataCorruption: If the download's
287 checksum doesn't agree with server-computed checksum.
288 """
289 # Retrieve the expected checksum only once for the download request,
290 # then compute and validate the checksum when the full download completes.
291 # Retried requests are range requests, and there's no way to detect
292 # data corruption for that byte range alone.
293 if self._expected_checksum is None and self._checksum_object is None:
294 # `_get_expected_checksum()` may return None even if a checksum was
295 # requested, in which case it will emit an info log _MISSING_CHECKSUM.
296 # If an invalid checksum type is specified, this will raise ValueError.
297 expected_checksum, checksum_object = _helpers._get_expected_checksum(
298 response, self._get_headers, self.media_url, checksum_type=self.checksum
299 )
300 self._expected_checksum = expected_checksum
301 self._checksum_object = checksum_object
302 else:
303 expected_checksum = self._expected_checksum
304 checksum_object = self._checksum_object
306 with response:
307 body_iter = response.raw.stream(
308 _request_helpers._SINGLE_GET_CHUNK_SIZE, decode_content=False
309 )
310 for chunk in body_iter:
311 self._stream.write(chunk)
312 self._bytes_downloaded += len(chunk)
313 checksum_object.update(chunk)
314 response._content_consumed = True
316 # Don't validate the checksum for partial responses.
317 if (
318 expected_checksum is not None
319 and response.status_code != http.client.PARTIAL_CONTENT
320 ):
321 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
323 if actual_checksum != expected_checksum:
324 msg = _CHECKSUM_MISMATCH.format(
325 self.media_url,
326 expected_checksum,
327 actual_checksum,
328 checksum_type=self.checksum.upper(),
329 )
330 raise common.DataCorruption(response, msg)
332 def consume(
333 self,
334 transport,
335 timeout=(
336 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
337 _request_helpers._DEFAULT_READ_TIMEOUT,
338 ),
339 ):
340 """Consume the resource to be downloaded.
342 If a ``stream`` is attached to this download, then the downloaded
343 resource will be written to the stream.
345 Args:
346 transport (~requests.Session): A ``requests`` object which can
347 make authenticated requests.
348 timeout (Optional[Union[float, Tuple[float, float]]]):
349 The number of seconds to wait for the server response.
350 Depending on the retry strategy, a request may be repeated
351 several times using the same timeout each time.
353 Can also be passed as a tuple (connect_timeout, read_timeout).
354 See :meth:`requests.Session.request` documentation for details.
356 Returns:
357 ~requests.Response: The HTTP response returned by ``transport``.
359 Raises:
360 ~google.resumable_media.common.DataCorruption: If the download's
361 checksum doesn't agree with server-computed checksum.
362 ValueError: If the current :class:`Download` has already
363 finished.
364 """
365 method, _, payload, headers = self._prepare_request()
366 # NOTE: We assume "payload is None" but pass it along anyway.
367 request_kwargs = {
368 "data": payload,
369 "headers": headers,
370 "timeout": timeout,
371 "stream": True,
372 }
374 # Assign object generation if generation is specified in the media url.
375 if self._object_generation is None:
376 self._object_generation = _helpers._get_generation_from_url(self.media_url)
378 # Wrap the request business logic in a function to be retried.
379 def retriable_request():
380 url = self.media_url
382 # To restart an interrupted download, read from the offset of last byte
383 # received using a range request, and set object generation query param.
384 if self._bytes_downloaded > 0:
385 _download.add_bytes_range(
386 self._bytes_downloaded, self.end, self._headers
387 )
388 request_kwargs["headers"] = self._headers
390 # Set object generation query param to ensure the same object content is requested.
391 if (
392 self._object_generation is not None
393 and _helpers._get_generation_from_url(self.media_url) is None
394 ):
395 query_param = {"generation": self._object_generation}
396 url = _helpers.add_query_parameters(self.media_url, query_param)
398 result = transport.request(method, url, **request_kwargs)
400 # If a generation hasn't been specified, and this is the first response we get, let's record the
401 # generation. In future requests we'll specify the generation query param to avoid data races.
402 if self._object_generation is None:
403 self._object_generation = _helpers._parse_generation_header(
404 result, self._get_headers
405 )
407 self._process_response(result)
409 # With decompressive transcoding, GCS serves back the whole file regardless of the range request,
410 # thus we reset the stream position to the start of the stream.
411 # See: https://cloud.google.com/storage/docs/transcoding#range
412 if self._stream is not None:
413 if _helpers._is_decompressive_transcoding(result, self._get_headers):
414 try:
415 self._stream.seek(0)
416 except Exception as exc:
417 msg = _STREAM_SEEK_ERROR.format(url)
418 raise Exception(msg) from exc
419 self._bytes_downloaded = 0
421 self._write_to_stream(result)
423 return result
425 return _request_helpers.wait_and_retry(
426 retriable_request, self._get_status_code, self._retry_strategy
427 )
430class ChunkedDownload(_request_helpers.RequestsMixin, _download.ChunkedDownload):
431 """Download a resource in chunks from a Google API.
433 Args:
434 media_url (str): The URL containing the media to be downloaded.
435 chunk_size (int): The number of bytes to be retrieved in each
436 request.
437 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
438 will be used to concatenate chunks of the resource as they are
439 downloaded.
440 start (int): The first byte in a range to be downloaded. If not
441 provided, defaults to ``0``.
442 end (int): The last byte in a range to be downloaded. If not
443 provided, will download to the end of the media.
444 headers (Optional[Mapping[str, str]]): Extra headers that should
445 be sent with each request, e.g. headers for data encryption
446 key headers.
448 Attributes:
449 media_url (str): The URL containing the media to be downloaded.
450 start (Optional[int]): The first byte in a range to be downloaded.
451 end (Optional[int]): The last byte in a range to be downloaded.
452 chunk_size (int): The number of bytes to be retrieved in each request.
454 Raises:
455 ValueError: If ``start`` is negative.
456 """
458 def consume_next_chunk(
459 self,
460 transport,
461 timeout=(
462 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
463 _request_helpers._DEFAULT_READ_TIMEOUT,
464 ),
465 ):
466 """Consume the next chunk of the resource to be downloaded.
468 Args:
469 transport (~requests.Session): A ``requests`` object which can
470 make authenticated requests.
471 timeout (Optional[Union[float, Tuple[float, float]]]):
472 The number of seconds to wait for the server response.
473 Depending on the retry strategy, a request may be repeated
474 several times using the same timeout each time.
476 Can also be passed as a tuple (connect_timeout, read_timeout).
477 See :meth:`requests.Session.request` documentation for details.
479 Returns:
480 ~requests.Response: The HTTP response returned by ``transport``.
482 Raises:
483 ValueError: If the current download has finished.
484 """
485 method, url, payload, headers = self._prepare_request()
487 # Wrap the request business logic in a function to be retried.
488 def retriable_request():
489 # NOTE: We assume "payload is None" but pass it along anyway.
490 result = transport.request(
491 method,
492 url,
493 data=payload,
494 headers=headers,
495 timeout=timeout,
496 )
497 self._process_response(result)
498 return result
500 return _request_helpers.wait_and_retry(
501 retriable_request, self._get_status_code, self._retry_strategy
502 )
505class RawChunkedDownload(_request_helpers.RawRequestsMixin, _download.ChunkedDownload):
506 """Download a raw resource in chunks from a Google API.
508 Args:
509 media_url (str): The URL containing the media to be downloaded.
510 chunk_size (int): The number of bytes to be retrieved in each
511 request.
512 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
513 will be used to concatenate chunks of the resource as they are
514 downloaded.
515 start (int): The first byte in a range to be downloaded. If not
516 provided, defaults to ``0``.
517 end (int): The last byte in a range to be downloaded. If not
518 provided, will download to the end of the media.
519 headers (Optional[Mapping[str, str]]): Extra headers that should
520 be sent with each request, e.g. headers for data encryption
521 key headers.
523 Attributes:
524 media_url (str): The URL containing the media to be downloaded.
525 start (Optional[int]): The first byte in a range to be downloaded.
526 end (Optional[int]): The last byte in a range to be downloaded.
527 chunk_size (int): The number of bytes to be retrieved in each request.
529 Raises:
530 ValueError: If ``start`` is negative.
531 """
533 def consume_next_chunk(
534 self,
535 transport,
536 timeout=(
537 _request_helpers._DEFAULT_CONNECT_TIMEOUT,
538 _request_helpers._DEFAULT_READ_TIMEOUT,
539 ),
540 ):
541 """Consume the next chunk of the resource to be downloaded.
543 Args:
544 transport (~requests.Session): A ``requests`` object which can
545 make authenticated requests.
546 timeout (Optional[Union[float, Tuple[float, float]]]):
547 The number of seconds to wait for the server response.
548 Depending on the retry strategy, a request may be repeated
549 several times using the same timeout each time.
551 Can also be passed as a tuple (connect_timeout, read_timeout).
552 See :meth:`requests.Session.request` documentation for details.
554 Returns:
555 ~requests.Response: The HTTP response returned by ``transport``.
557 Raises:
558 ValueError: If the current download has finished.
559 """
560 method, url, payload, headers = self._prepare_request()
562 # Wrap the request business logic in a function to be retried.
563 def retriable_request():
564 # NOTE: We assume "payload is None" but pass it along anyway.
565 result = transport.request(
566 method,
567 url,
568 data=payload,
569 headers=headers,
570 stream=True,
571 timeout=timeout,
572 )
573 self._process_response(result)
574 return result
576 return _request_helpers.wait_and_retry(
577 retriable_request, self._get_status_code, self._retry_strategy
578 )
581def _add_decoder(response_raw, checksum):
582 """Patch the ``_decoder`` on a ``urllib3`` response.
584 This is so that we can intercept the compressed bytes before they are
585 decoded.
587 Only patches if the content encoding is ``gzip`` or ``br``.
589 Args:
590 response_raw (urllib3.response.HTTPResponse): The raw response for
591 an HTTP request.
592 checksum (object):
593 A checksum which will be updated with compressed bytes.
595 Returns:
596 object: Either the original ``checksum`` if ``_decoder`` is not
597 patched, or a ``_DoNothingHash`` if the decoder is patched, since the
598 caller will no longer need to hash to decoded bytes.
599 """
600 encoding = response_raw.headers.get("content-encoding", "").lower()
601 if encoding == "gzip":
602 response_raw._decoder = _GzipDecoder(checksum)
603 return _helpers._DoNothingHash()
604 # Only activate if brotli is installed
605 elif encoding == "br" and _BrotliDecoder: # type: ignore
606 response_raw._decoder = _BrotliDecoder(checksum)
607 return _helpers._DoNothingHash()
608 else:
609 return checksum
612class _GzipDecoder(urllib3.response.GzipDecoder):
613 """Custom subclass of ``urllib3`` decoder for ``gzip``-ed bytes.
615 Allows a checksum function to see the compressed bytes before they are
616 decoded. This way the checksum of the compressed value can be computed.
618 Args:
619 checksum (object):
620 A checksum which will be updated with compressed bytes.
621 """
623 def __init__(self, checksum):
624 super().__init__()
625 self._checksum = checksum
627 def decompress(self, data):
628 """Decompress the bytes.
630 Args:
631 data (bytes): The compressed bytes to be decompressed.
633 Returns:
634 bytes: The decompressed bytes from ``data``.
635 """
636 self._checksum.update(data)
637 return super().decompress(data)
640# urllib3.response.BrotliDecoder might not exist depending on whether brotli is
641# installed.
642if hasattr(urllib3.response, "BrotliDecoder"):
644 class _BrotliDecoder:
645 """Handler for ``brotli`` encoded bytes.
647 Allows a checksum function to see the compressed bytes before they are
648 decoded. This way the checksum of the compressed value can be computed.
650 Because BrotliDecoder's decompress method is dynamically created in
651 urllib3, a subclass is not practical. Instead, this class creates a
652 captive urllib3.requests.BrotliDecoder instance and acts as a proxy.
654 Args:
655 checksum (object):
656 A checksum which will be updated with compressed bytes.
657 """
659 def __init__(self, checksum):
660 self._decoder = urllib3.response.BrotliDecoder()
661 self._checksum = checksum
663 def decompress(self, data):
664 """Decompress the bytes.
666 Args:
667 data (bytes): The compressed bytes to be decompressed.
669 Returns:
670 bytes: The decompressed bytes from ``data``.
671 """
672 self._checksum.update(data)
673 return self._decoder.decompress(data)
675 def flush(self):
676 return self._decoder.flush()
678else: # pragma: NO COVER
679 _BrotliDecoder = None # type: ignore # pragma: NO COVER