1# Copyright 2017 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Virtual bases classes for downloading media from Google APIs."""
16
17
18import http.client
19import re
20
21from google.cloud.storage._media import _helpers
22from google.cloud.storage.exceptions import InvalidResponse
23from google.cloud.storage.retry import DEFAULT_RETRY
24
25
26_CONTENT_RANGE_RE = re.compile(
27 r"bytes (?P<start_byte>\d+)-(?P<end_byte>\d+)/(?P<total_bytes>\d+)",
28 flags=re.IGNORECASE,
29)
30_ACCEPTABLE_STATUS_CODES = (http.client.OK, http.client.PARTIAL_CONTENT)
31_GET = "GET"
32_ZERO_CONTENT_RANGE_HEADER = "bytes */0"
33
34
35class DownloadBase(object):
36 """Base class for download helpers.
37
38 Defines core shared behavior across different download types.
39
40 Args:
41 media_url (str): The URL containing the media to be downloaded.
42 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
43 the downloaded resource can be written to.
44 start (int): The first byte in a range to be downloaded.
45 end (int): The last byte in a range to be downloaded.
46 headers (Optional[Mapping[str, str]]): Extra headers that should
47 be sent with the request, e.g. headers for encrypted data.
48 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
49 A None value will disable retries. A google.api_core.retry.Retry
50 value will enable retries, and the object will configure backoff and
51 timeout options.
52
53 See the retry.py source code and docstrings in this package
54 (google.cloud.storage.retry) for information on retry types and how
55 to configure them.
56
57 Attributes:
58 media_url (str): The URL containing the media to be downloaded.
59 start (Optional[int]): The first byte in a range to be downloaded.
60 end (Optional[int]): The last byte in a range to be downloaded.
61 """
62
63 def __init__(
64 self,
65 media_url,
66 stream=None,
67 start=None,
68 end=None,
69 headers=None,
70 retry=DEFAULT_RETRY,
71 ):
72 self.media_url = media_url
73 self._stream = stream
74 self.start = start
75 self.end = end
76 if headers is None:
77 headers = {}
78 self._headers = headers
79 self._finished = False
80 self._retry_strategy = retry
81
82 @property
83 def finished(self):
84 """bool: Flag indicating if the download has completed."""
85 return self._finished
86
87 @staticmethod
88 def _get_status_code(response):
89 """Access the status code from an HTTP response.
90
91 Args:
92 response (object): The HTTP response object.
93
94 Raises:
95 NotImplementedError: Always, since virtual.
96 """
97 raise NotImplementedError("This implementation is virtual.")
98
99 @staticmethod
100 def _get_headers(response):
101 """Access the headers from an HTTP response.
102
103 Args:
104 response (object): The HTTP response object.
105
106 Raises:
107 NotImplementedError: Always, since virtual.
108 """
109 raise NotImplementedError("This implementation is virtual.")
110
111 @staticmethod
112 def _get_body(response):
113 """Access the response body from an HTTP response.
114
115 Args:
116 response (object): The HTTP response object.
117
118 Raises:
119 NotImplementedError: Always, since virtual.
120 """
121 raise NotImplementedError("This implementation is virtual.")
122
123
124class Download(DownloadBase):
125 """Helper to manage downloading a resource from a Google API.
126
127 "Slices" of the resource can be retrieved by specifying a range
128 with ``start`` and / or ``end``. However, in typical usage, neither
129 ``start`` nor ``end`` is expected to be provided.
130
131 Args:
132 media_url (str): The URL containing the media to be downloaded.
133 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
134 the downloaded resource can be written to.
135 start (int): The first byte in a range to be downloaded. If not
136 provided, but ``end`` is provided, will download from the
137 beginning to ``end`` of the media.
138 end (int): The last byte in a range to be downloaded. If not
139 provided, but ``start`` is provided, will download from the
140 ``start`` to the end of the media.
141 headers (Optional[Mapping[str, str]]): Extra headers that should
142 be sent with the request, e.g. headers for encrypted data.
143 checksum (Optional[str]): The type of checksum to compute to verify
144 the integrity of the object. The response headers must contain
145 a checksum of the requested type. If the headers lack an
146 appropriate checksum (for instance in the case of transcoded or
147 ranged downloads where the remote service does not know the
148 correct checksum) an INFO-level log will be emitted. Supported
149 values are "md5", "crc32c", "auto" and None. The default is "auto",
150 which will try to detect if the C extension for crc32c is installed
151 and fall back to md5 otherwise.
152 retry (Optional[google.api_core.retry.Retry]): How to retry the
153 RPC. A None value will disable retries. A
154 google.api_core.retry.Retry value will enable retries, and the
155 object will configure backoff and timeout options.
156
157 See the retry.py source code and docstrings in this package
158 (google.cloud.storage.retry) for information on retry types and how
159 to configure them.
160 single_shot_download (Optional[bool]): If true, download the object in a single request.
161 Caution: Enabling this will increase the memory overload for your application.
162 Please enable this as per your use case.
163
164 """
165
166 def __init__(
167 self,
168 media_url,
169 stream=None,
170 start=None,
171 end=None,
172 headers=None,
173 checksum="auto",
174 retry=DEFAULT_RETRY,
175 single_shot_download=False,
176 ):
177 super(Download, self).__init__(
178 media_url, stream=stream, start=start, end=end, headers=headers, retry=retry
179 )
180 self.checksum = checksum
181 if self.checksum == "auto":
182 self.checksum = (
183 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5"
184 )
185 self.single_shot_download = single_shot_download
186 self._bytes_downloaded = 0
187 self._expected_checksum = None
188 self._checksum_object = None
189 self._object_generation = None
190
191 def _prepare_request(self):
192 """Prepare the contents of an HTTP request.
193
194 This is everything that must be done before a request that doesn't
195 require network I/O (or other I/O). This is based on the `sans-I/O`_
196 philosophy.
197
198 Returns:
199 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple
200
201 * HTTP verb for the request (always GET)
202 * the URL for the request
203 * the body of the request (always :data:`None`)
204 * headers for the request
205
206 Raises:
207 ValueError: If the current :class:`Download` has already
208 finished.
209
210 .. _sans-I/O: https://sans-io.readthedocs.io/
211 """
212 if self.finished:
213 raise ValueError("A download can only be used once.")
214
215 add_bytes_range(self.start, self.end, self._headers)
216 return _GET, self.media_url, None, self._headers
217
218 def _process_response(self, response):
219 """Process the response from an HTTP request.
220
221 This is everything that must be done after a request that doesn't
222 require network I/O (or other I/O). This is based on the `sans-I/O`_
223 philosophy.
224
225 Args:
226 response (object): The HTTP response object.
227
228 .. _sans-I/O: https://sans-io.readthedocs.io/
229 """
230 # Tombstone the current Download so it cannot be used again.
231 self._finished = True
232 _helpers.require_status_code(
233 response, _ACCEPTABLE_STATUS_CODES, self._get_status_code
234 )
235
236 def consume(self, transport, timeout=None):
237 """Consume the resource to be downloaded.
238
239 If a ``stream`` is attached to this download, then the downloaded
240 resource will be written to the stream.
241
242 Args:
243 transport (object): An object which can make authenticated
244 requests.
245 timeout (Optional[Union[float, Tuple[float, float]]]):
246 The number of seconds to wait for the server response.
247 Depending on the retry strategy, a request may be repeated
248 several times using the same timeout each time.
249
250 Can also be passed as a tuple (connect_timeout, read_timeout).
251 See :meth:`requests.Session.request` documentation for details.
252
253 Raises:
254 NotImplementedError: Always, since virtual.
255 """
256 raise NotImplementedError("This implementation is virtual.")
257
258
259class ChunkedDownload(DownloadBase):
260 """Download a resource in chunks from a Google API.
261
262 Args:
263 media_url (str): The URL containing the media to be downloaded.
264 chunk_size (int): The number of bytes to be retrieved in each
265 request.
266 stream (IO[bytes]): A write-able stream (i.e. file-like object) that
267 will be used to concatenate chunks of the resource as they are
268 downloaded.
269 start (int): The first byte in a range to be downloaded. If not
270 provided, defaults to ``0``.
271 end (int): The last byte in a range to be downloaded. If not
272 provided, will download to the end of the media.
273 headers (Optional[Mapping[str, str]]): Extra headers that should
274 be sent with each request, e.g. headers for data encryption
275 key headers.
276 retry (Optional[google.api_core.retry.Retry]): How to retry the
277 RPC. A None value will disable retries. A
278 google.api_core.retry.Retry value will enable retries, and the
279 object will configure backoff and timeout options.
280
281 See the retry.py source code and docstrings in this package
282 (google.cloud.storage.retry) for information on retry types and how
283 to configure them.
284
285 Attributes:
286 media_url (str): The URL containing the media to be downloaded.
287 start (Optional[int]): The first byte in a range to be downloaded.
288 end (Optional[int]): The last byte in a range to be downloaded.
289 chunk_size (int): The number of bytes to be retrieved in each request.
290
291 Raises:
292 ValueError: If ``start`` is negative.
293 """
294
295 def __init__(
296 self,
297 media_url,
298 chunk_size,
299 stream,
300 start=0,
301 end=None,
302 headers=None,
303 retry=DEFAULT_RETRY,
304 ):
305 if start < 0:
306 raise ValueError(
307 "On a chunked download the starting " "value cannot be negative."
308 )
309 super(ChunkedDownload, self).__init__(
310 media_url,
311 stream=stream,
312 start=start,
313 end=end,
314 headers=headers,
315 retry=retry,
316 )
317 self.chunk_size = chunk_size
318 self._bytes_downloaded = 0
319 self._total_bytes = None
320 self._invalid = False
321
322 @property
323 def bytes_downloaded(self):
324 """int: Number of bytes that have been downloaded."""
325 return self._bytes_downloaded
326
327 @property
328 def total_bytes(self):
329 """Optional[int]: The total number of bytes to be downloaded."""
330 return self._total_bytes
331
332 @property
333 def invalid(self):
334 """bool: Indicates if the download is in an invalid state.
335
336 This will occur if a call to :meth:`consume_next_chunk` fails.
337 """
338 return self._invalid
339
340 def _get_byte_range(self):
341 """Determines the byte range for the next request.
342
343 Returns:
344 Tuple[int, int]: The pair of begin and end byte for the next
345 chunked request.
346 """
347 curr_start = self.start + self.bytes_downloaded
348 curr_end = curr_start + self.chunk_size - 1
349 # Make sure ``curr_end`` does not exceed ``end``.
350 if self.end is not None:
351 curr_end = min(curr_end, self.end)
352 # Make sure ``curr_end`` does not exceed ``total_bytes - 1``.
353 if self.total_bytes is not None:
354 curr_end = min(curr_end, self.total_bytes - 1)
355 return curr_start, curr_end
356
357 def _prepare_request(self):
358 """Prepare the contents of an HTTP request.
359
360 This is everything that must be done before a request that doesn't
361 require network I/O (or other I/O). This is based on the `sans-I/O`_
362 philosophy.
363
364 .. note:
365
366 This method will be used multiple times, so ``headers`` will
367 be mutated in between requests. However, we don't make a copy
368 since the same keys are being updated.
369
370 Returns:
371 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple
372
373 * HTTP verb for the request (always GET)
374 * the URL for the request
375 * the body of the request (always :data:`None`)
376 * headers for the request
377
378 Raises:
379 ValueError: If the current download has finished.
380 ValueError: If the current download is invalid.
381
382 .. _sans-I/O: https://sans-io.readthedocs.io/
383 """
384 if self.finished:
385 raise ValueError("Download has finished.")
386 if self.invalid:
387 raise ValueError("Download is invalid and cannot be re-used.")
388
389 curr_start, curr_end = self._get_byte_range()
390 add_bytes_range(curr_start, curr_end, self._headers)
391 return _GET, self.media_url, None, self._headers
392
393 def _make_invalid(self):
394 """Simple setter for ``invalid``.
395
396 This is intended to be passed along as a callback to helpers that
397 raise an exception so they can mark this instance as invalid before
398 raising.
399 """
400 self._invalid = True
401
402 def _process_response(self, response):
403 """Process the response from an HTTP request.
404
405 This is everything that must be done after a request that doesn't
406 require network I/O. This is based on the `sans-I/O`_ philosophy.
407
408 For the time being, this **does require** some form of I/O to write
409 a chunk to ``stream``. However, this will (almost) certainly not be
410 network I/O.
411
412 Updates the current state after consuming a chunk. First,
413 increments ``bytes_downloaded`` by the number of bytes in the
414 ``content-length`` header.
415
416 If ``total_bytes`` is already set, this assumes (but does not check)
417 that we already have the correct value and doesn't bother to check
418 that it agrees with the headers.
419
420 We expect the **total** length to be in the ``content-range`` header,
421 but this header is only present on requests which sent the ``range``
422 header. This response header should be of the form
423 ``bytes {start}-{end}/{total}`` and ``{end} - {start} + 1``
424 should be the same as the ``Content-Length``.
425
426 Args:
427 response (object): The HTTP response object (need headers).
428
429 Raises:
430 ~google.cloud.storage.exceptions.InvalidResponse: If the number
431 of bytes in the body doesn't match the content length header.
432
433 .. _sans-I/O: https://sans-io.readthedocs.io/
434 """
435 # Verify the response before updating the current instance.
436 if _check_for_zero_content_range(
437 response, self._get_status_code, self._get_headers
438 ):
439 self._finished = True
440 return
441
442 _helpers.require_status_code(
443 response,
444 _ACCEPTABLE_STATUS_CODES,
445 self._get_status_code,
446 callback=self._make_invalid,
447 )
448 headers = self._get_headers(response)
449 response_body = self._get_body(response)
450
451 start_byte, end_byte, total_bytes = get_range_info(
452 response, self._get_headers, callback=self._make_invalid
453 )
454
455 transfer_encoding = headers.get("transfer-encoding")
456
457 if transfer_encoding is None:
458 content_length = _helpers.header_required(
459 response,
460 "content-length",
461 self._get_headers,
462 callback=self._make_invalid,
463 )
464 num_bytes = int(content_length)
465 if len(response_body) != num_bytes:
466 self._make_invalid()
467 raise InvalidResponse(
468 response,
469 "Response is different size than content-length",
470 "Expected",
471 num_bytes,
472 "Received",
473 len(response_body),
474 )
475 else:
476 # 'content-length' header not allowed with chunked encoding.
477 num_bytes = end_byte - start_byte + 1
478
479 # First update ``bytes_downloaded``.
480 self._bytes_downloaded += num_bytes
481 # If the end byte is past ``end`` or ``total_bytes - 1`` we are done.
482 if self.end is not None and end_byte >= self.end:
483 self._finished = True
484 elif end_byte >= total_bytes - 1:
485 self._finished = True
486 # NOTE: We only use ``total_bytes`` if not already known.
487 if self.total_bytes is None:
488 self._total_bytes = total_bytes
489 # Write the response body to the stream.
490 self._stream.write(response_body)
491
492 def consume_next_chunk(self, transport, timeout=None):
493 """Consume the next chunk of the resource to be downloaded.
494
495 Args:
496 transport (object): An object which can make authenticated
497 requests.
498 timeout (Optional[Union[float, Tuple[float, float]]]):
499 The number of seconds to wait for the server response.
500 Depending on the retry strategy, a request may be repeated
501 several times using the same timeout each time.
502
503 Can also be passed as a tuple (connect_timeout, read_timeout).
504 See :meth:`requests.Session.request` documentation for details.
505
506 Raises:
507 NotImplementedError: Always, since virtual.
508 """
509 raise NotImplementedError("This implementation is virtual.")
510
511
512def add_bytes_range(start, end, headers):
513 """Add a bytes range to a header dictionary.
514
515 Some possible inputs and the corresponding bytes ranges::
516
517 >>> headers = {}
518 >>> add_bytes_range(None, None, headers)
519 >>> headers
520 {}
521 >>> add_bytes_range(500, 999, headers)
522 >>> headers['range']
523 'bytes=500-999'
524 >>> add_bytes_range(None, 499, headers)
525 >>> headers['range']
526 'bytes=0-499'
527 >>> add_bytes_range(-500, None, headers)
528 >>> headers['range']
529 'bytes=-500'
530 >>> add_bytes_range(9500, None, headers)
531 >>> headers['range']
532 'bytes=9500-'
533
534 Args:
535 start (Optional[int]): The first byte in a range. Can be zero,
536 positive, negative or :data:`None`.
537 end (Optional[int]): The last byte in a range. Assumed to be
538 positive.
539 headers (Mapping[str, str]): A headers mapping which can have the
540 bytes range added if at least one of ``start`` or ``end``
541 is not :data:`None`.
542 """
543 if start is None:
544 if end is None:
545 # No range to add.
546 return
547 else:
548 # NOTE: This assumes ``end`` is non-negative.
549 bytes_range = "0-{:d}".format(end)
550 else:
551 if end is None:
552 if start < 0:
553 bytes_range = "{:d}".format(start)
554 else:
555 bytes_range = "{:d}-".format(start)
556 else:
557 # NOTE: This is invalid if ``start < 0``.
558 bytes_range = "{:d}-{:d}".format(start, end)
559
560 headers[_helpers.RANGE_HEADER] = "bytes=" + bytes_range
561
562
563def get_range_info(response, get_headers, callback=_helpers.do_nothing):
564 """Get the start, end and total bytes from a content range header.
565
566 Args:
567 response (object): An HTTP response object.
568 get_headers (Callable[Any, Mapping[str, str]]): Helper to get headers
569 from an HTTP response.
570 callback (Optional[Callable]): A callback that takes no arguments,
571 to be executed when an exception is being raised.
572
573 Returns:
574 Tuple[int, int, int]: The start byte, end byte and total bytes.
575
576 Raises:
577 ~google.cloud.storage.exceptions.InvalidResponse: If the
578 ``Content-Range`` header is not of the form
579 ``bytes {start}-{end}/{total}``.
580 """
581 content_range = _helpers.header_required(
582 response, _helpers.CONTENT_RANGE_HEADER, get_headers, callback=callback
583 )
584 match = _CONTENT_RANGE_RE.match(content_range)
585 if match is None:
586 callback()
587 raise InvalidResponse(
588 response,
589 "Unexpected content-range header",
590 content_range,
591 'Expected to be of the form "bytes {start}-{end}/{total}"',
592 )
593
594 return (
595 int(match.group("start_byte")),
596 int(match.group("end_byte")),
597 int(match.group("total_bytes")),
598 )
599
600
601def _check_for_zero_content_range(response, get_status_code, get_headers):
602 """Validate if response status code is 416 and content range is zero.
603
604 This is the special case for handling zero bytes files.
605
606 Args:
607 response (object): An HTTP response object.
608 get_status_code (Callable[Any, int]): Helper to get a status code
609 from a response.
610 get_headers (Callable[Any, Mapping[str, str]]): Helper to get headers
611 from an HTTP response.
612
613 Returns:
614 bool: True if content range total bytes is zero, false otherwise.
615 """
616 if get_status_code(response) == http.client.REQUESTED_RANGE_NOT_SATISFIABLE:
617 content_range = _helpers.header_required(
618 response,
619 _helpers.CONTENT_RANGE_HEADER,
620 get_headers,
621 callback=_helpers.do_nothing,
622 )
623 if content_range == _ZERO_CONTENT_RANGE_HEADER:
624 return True
625 return False