Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/_media/_download.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1# Copyright 2017 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Virtual bases classes for downloading media from Google APIs.""" 

16 

17 

18import http.client 

19import re 

20 

21from google.cloud.storage._media import _helpers 

22from google.cloud.storage.exceptions import InvalidResponse 

23from google.cloud.storage.retry import DEFAULT_RETRY 

24 

25 

26_CONTENT_RANGE_RE = re.compile( 

27 r"bytes (?P<start_byte>\d+)-(?P<end_byte>\d+)/(?P<total_bytes>\d+)", 

28 flags=re.IGNORECASE, 

29) 

30_ACCEPTABLE_STATUS_CODES = (http.client.OK, http.client.PARTIAL_CONTENT) 

31_GET = "GET" 

32_ZERO_CONTENT_RANGE_HEADER = "bytes */0" 

33 

34 

35class DownloadBase(object): 

36 """Base class for download helpers. 

37 

38 Defines core shared behavior across different download types. 

39 

40 Args: 

41 media_url (str): The URL containing the media to be downloaded. 

42 stream (IO[bytes]): A write-able stream (i.e. file-like object) that 

43 the downloaded resource can be written to. 

44 start (int): The first byte in a range to be downloaded. 

45 end (int): The last byte in a range to be downloaded. 

46 headers (Optional[Mapping[str, str]]): Extra headers that should 

47 be sent with the request, e.g. headers for encrypted data. 

48 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

49 A None value will disable retries. A google.api_core.retry.Retry 

50 value will enable retries, and the object will configure backoff and 

51 timeout options. 

52 

53 See the retry.py source code and docstrings in this package 

54 (google.cloud.storage.retry) for information on retry types and how 

55 to configure them. 

56 

57 Attributes: 

58 media_url (str): The URL containing the media to be downloaded. 

59 start (Optional[int]): The first byte in a range to be downloaded. 

60 end (Optional[int]): The last byte in a range to be downloaded. 

61 """ 

62 

63 def __init__( 

64 self, 

65 media_url, 

66 stream=None, 

67 start=None, 

68 end=None, 

69 headers=None, 

70 retry=DEFAULT_RETRY, 

71 ): 

72 self.media_url = media_url 

73 self._stream = stream 

74 self.start = start 

75 self.end = end 

76 if headers is None: 

77 headers = {} 

78 self._headers = headers 

79 self._finished = False 

80 self._retry_strategy = retry 

81 

82 @property 

83 def finished(self): 

84 """bool: Flag indicating if the download has completed.""" 

85 return self._finished 

86 

87 @staticmethod 

88 def _get_status_code(response): 

89 """Access the status code from an HTTP response. 

90 

91 Args: 

92 response (object): The HTTP response object. 

93 

94 Raises: 

95 NotImplementedError: Always, since virtual. 

96 """ 

97 raise NotImplementedError("This implementation is virtual.") 

98 

99 @staticmethod 

100 def _get_headers(response): 

101 """Access the headers from an HTTP response. 

102 

103 Args: 

104 response (object): The HTTP response object. 

105 

106 Raises: 

107 NotImplementedError: Always, since virtual. 

108 """ 

109 raise NotImplementedError("This implementation is virtual.") 

110 

111 @staticmethod 

112 def _get_body(response): 

113 """Access the response body from an HTTP response. 

114 

115 Args: 

116 response (object): The HTTP response object. 

117 

118 Raises: 

119 NotImplementedError: Always, since virtual. 

120 """ 

121 raise NotImplementedError("This implementation is virtual.") 

122 

123 

124class Download(DownloadBase): 

125 """Helper to manage downloading a resource from a Google API. 

126 

127 "Slices" of the resource can be retrieved by specifying a range 

128 with ``start`` and / or ``end``. However, in typical usage, neither 

129 ``start`` nor ``end`` is expected to be provided. 

130 

131 Args: 

132 media_url (str): The URL containing the media to be downloaded. 

133 stream (IO[bytes]): A write-able stream (i.e. file-like object) that 

134 the downloaded resource can be written to. 

135 start (int): The first byte in a range to be downloaded. If not 

136 provided, but ``end`` is provided, will download from the 

137 beginning to ``end`` of the media. 

138 end (int): The last byte in a range to be downloaded. If not 

139 provided, but ``start`` is provided, will download from the 

140 ``start`` to the end of the media. 

141 headers (Optional[Mapping[str, str]]): Extra headers that should 

142 be sent with the request, e.g. headers for encrypted data. 

143 checksum (Optional[str]): The type of checksum to compute to verify 

144 the integrity of the object. The response headers must contain 

145 a checksum of the requested type. If the headers lack an 

146 appropriate checksum (for instance in the case of transcoded or 

147 ranged downloads where the remote service does not know the 

148 correct checksum) an INFO-level log will be emitted. Supported 

149 values are "md5", "crc32c", "auto" and None. The default is "auto", 

150 which will try to detect if the C extension for crc32c is installed 

151 and fall back to md5 otherwise. 

152 retry (Optional[google.api_core.retry.Retry]): How to retry the 

153 RPC. A None value will disable retries. A 

154 google.api_core.retry.Retry value will enable retries, and the 

155 object will configure backoff and timeout options. 

156 

157 See the retry.py source code and docstrings in this package 

158 (google.cloud.storage.retry) for information on retry types and how 

159 to configure them. 

160 single_shot_download (Optional[bool]): If true, download the object in a single request. 

161 Caution: Enabling this will increase the memory overload for your application. 

162 Please enable this as per your use case. 

163 

164 """ 

165 

166 def __init__( 

167 self, 

168 media_url, 

169 stream=None, 

170 start=None, 

171 end=None, 

172 headers=None, 

173 checksum="auto", 

174 retry=DEFAULT_RETRY, 

175 single_shot_download=False, 

176 ): 

177 super(Download, self).__init__( 

178 media_url, stream=stream, start=start, end=end, headers=headers, retry=retry 

179 ) 

180 self.checksum = checksum 

181 if self.checksum == "auto": 

182 self.checksum = ( 

183 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

184 ) 

185 self.single_shot_download = single_shot_download 

186 self._bytes_downloaded = 0 

187 self._expected_checksum = None 

188 self._checksum_object = None 

189 self._object_generation = None 

190 

191 def _prepare_request(self): 

192 """Prepare the contents of an HTTP request. 

193 

194 This is everything that must be done before a request that doesn't 

195 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

196 philosophy. 

197 

198 Returns: 

199 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

200 

201 * HTTP verb for the request (always GET) 

202 * the URL for the request 

203 * the body of the request (always :data:`None`) 

204 * headers for the request 

205 

206 Raises: 

207 ValueError: If the current :class:`Download` has already 

208 finished. 

209 

210 .. _sans-I/O: https://sans-io.readthedocs.io/ 

211 """ 

212 if self.finished: 

213 raise ValueError("A download can only be used once.") 

214 

215 add_bytes_range(self.start, self.end, self._headers) 

216 return _GET, self.media_url, None, self._headers 

217 

218 def _process_response(self, response): 

219 """Process the response from an HTTP request. 

220 

221 This is everything that must be done after a request that doesn't 

222 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

223 philosophy. 

224 

225 Args: 

226 response (object): The HTTP response object. 

227 

228 .. _sans-I/O: https://sans-io.readthedocs.io/ 

229 """ 

230 # Tombstone the current Download so it cannot be used again. 

231 self._finished = True 

232 _helpers.require_status_code( 

233 response, _ACCEPTABLE_STATUS_CODES, self._get_status_code 

234 ) 

235 

236 def consume(self, transport, timeout=None): 

237 """Consume the resource to be downloaded. 

238 

239 If a ``stream`` is attached to this download, then the downloaded 

240 resource will be written to the stream. 

241 

242 Args: 

243 transport (object): An object which can make authenticated 

244 requests. 

245 timeout (Optional[Union[float, Tuple[float, float]]]): 

246 The number of seconds to wait for the server response. 

247 Depending on the retry strategy, a request may be repeated 

248 several times using the same timeout each time. 

249 

250 Can also be passed as a tuple (connect_timeout, read_timeout). 

251 See :meth:`requests.Session.request` documentation for details. 

252 

253 Raises: 

254 NotImplementedError: Always, since virtual. 

255 """ 

256 raise NotImplementedError("This implementation is virtual.") 

257 

258 

259class ChunkedDownload(DownloadBase): 

260 """Download a resource in chunks from a Google API. 

261 

262 Args: 

263 media_url (str): The URL containing the media to be downloaded. 

264 chunk_size (int): The number of bytes to be retrieved in each 

265 request. 

266 stream (IO[bytes]): A write-able stream (i.e. file-like object) that 

267 will be used to concatenate chunks of the resource as they are 

268 downloaded. 

269 start (int): The first byte in a range to be downloaded. If not 

270 provided, defaults to ``0``. 

271 end (int): The last byte in a range to be downloaded. If not 

272 provided, will download to the end of the media. 

273 headers (Optional[Mapping[str, str]]): Extra headers that should 

274 be sent with each request, e.g. headers for data encryption 

275 key headers. 

276 retry (Optional[google.api_core.retry.Retry]): How to retry the 

277 RPC. A None value will disable retries. A 

278 google.api_core.retry.Retry value will enable retries, and the 

279 object will configure backoff and timeout options. 

280 

281 See the retry.py source code and docstrings in this package 

282 (google.cloud.storage.retry) for information on retry types and how 

283 to configure them. 

284 

285 Attributes: 

286 media_url (str): The URL containing the media to be downloaded. 

287 start (Optional[int]): The first byte in a range to be downloaded. 

288 end (Optional[int]): The last byte in a range to be downloaded. 

289 chunk_size (int): The number of bytes to be retrieved in each request. 

290 

291 Raises: 

292 ValueError: If ``start`` is negative. 

293 """ 

294 

295 def __init__( 

296 self, 

297 media_url, 

298 chunk_size, 

299 stream, 

300 start=0, 

301 end=None, 

302 headers=None, 

303 retry=DEFAULT_RETRY, 

304 ): 

305 if start < 0: 

306 raise ValueError( 

307 "On a chunked download the starting " "value cannot be negative." 

308 ) 

309 super(ChunkedDownload, self).__init__( 

310 media_url, 

311 stream=stream, 

312 start=start, 

313 end=end, 

314 headers=headers, 

315 retry=retry, 

316 ) 

317 self.chunk_size = chunk_size 

318 self._bytes_downloaded = 0 

319 self._total_bytes = None 

320 self._invalid = False 

321 

322 @property 

323 def bytes_downloaded(self): 

324 """int: Number of bytes that have been downloaded.""" 

325 return self._bytes_downloaded 

326 

327 @property 

328 def total_bytes(self): 

329 """Optional[int]: The total number of bytes to be downloaded.""" 

330 return self._total_bytes 

331 

332 @property 

333 def invalid(self): 

334 """bool: Indicates if the download is in an invalid state. 

335 

336 This will occur if a call to :meth:`consume_next_chunk` fails. 

337 """ 

338 return self._invalid 

339 

340 def _get_byte_range(self): 

341 """Determines the byte range for the next request. 

342 

343 Returns: 

344 Tuple[int, int]: The pair of begin and end byte for the next 

345 chunked request. 

346 """ 

347 curr_start = self.start + self.bytes_downloaded 

348 curr_end = curr_start + self.chunk_size - 1 

349 # Make sure ``curr_end`` does not exceed ``end``. 

350 if self.end is not None: 

351 curr_end = min(curr_end, self.end) 

352 # Make sure ``curr_end`` does not exceed ``total_bytes - 1``. 

353 if self.total_bytes is not None: 

354 curr_end = min(curr_end, self.total_bytes - 1) 

355 return curr_start, curr_end 

356 

357 def _prepare_request(self): 

358 """Prepare the contents of an HTTP request. 

359 

360 This is everything that must be done before a request that doesn't 

361 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

362 philosophy. 

363 

364 .. note: 

365 

366 This method will be used multiple times, so ``headers`` will 

367 be mutated in between requests. However, we don't make a copy 

368 since the same keys are being updated. 

369 

370 Returns: 

371 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

372 

373 * HTTP verb for the request (always GET) 

374 * the URL for the request 

375 * the body of the request (always :data:`None`) 

376 * headers for the request 

377 

378 Raises: 

379 ValueError: If the current download has finished. 

380 ValueError: If the current download is invalid. 

381 

382 .. _sans-I/O: https://sans-io.readthedocs.io/ 

383 """ 

384 if self.finished: 

385 raise ValueError("Download has finished.") 

386 if self.invalid: 

387 raise ValueError("Download is invalid and cannot be re-used.") 

388 

389 curr_start, curr_end = self._get_byte_range() 

390 add_bytes_range(curr_start, curr_end, self._headers) 

391 return _GET, self.media_url, None, self._headers 

392 

393 def _make_invalid(self): 

394 """Simple setter for ``invalid``. 

395 

396 This is intended to be passed along as a callback to helpers that 

397 raise an exception so they can mark this instance as invalid before 

398 raising. 

399 """ 

400 self._invalid = True 

401 

402 def _process_response(self, response): 

403 """Process the response from an HTTP request. 

404 

405 This is everything that must be done after a request that doesn't 

406 require network I/O. This is based on the `sans-I/O`_ philosophy. 

407 

408 For the time being, this **does require** some form of I/O to write 

409 a chunk to ``stream``. However, this will (almost) certainly not be 

410 network I/O. 

411 

412 Updates the current state after consuming a chunk. First, 

413 increments ``bytes_downloaded`` by the number of bytes in the 

414 ``content-length`` header. 

415 

416 If ``total_bytes`` is already set, this assumes (but does not check) 

417 that we already have the correct value and doesn't bother to check 

418 that it agrees with the headers. 

419 

420 We expect the **total** length to be in the ``content-range`` header, 

421 but this header is only present on requests which sent the ``range`` 

422 header. This response header should be of the form 

423 ``bytes {start}-{end}/{total}`` and ``{end} - {start} + 1`` 

424 should be the same as the ``Content-Length``. 

425 

426 Args: 

427 response (object): The HTTP response object (need headers). 

428 

429 Raises: 

430 ~google.cloud.storage.exceptions.InvalidResponse: If the number 

431 of bytes in the body doesn't match the content length header. 

432 

433 .. _sans-I/O: https://sans-io.readthedocs.io/ 

434 """ 

435 # Verify the response before updating the current instance. 

436 if _check_for_zero_content_range( 

437 response, self._get_status_code, self._get_headers 

438 ): 

439 self._finished = True 

440 return 

441 

442 _helpers.require_status_code( 

443 response, 

444 _ACCEPTABLE_STATUS_CODES, 

445 self._get_status_code, 

446 callback=self._make_invalid, 

447 ) 

448 headers = self._get_headers(response) 

449 response_body = self._get_body(response) 

450 

451 start_byte, end_byte, total_bytes = get_range_info( 

452 response, self._get_headers, callback=self._make_invalid 

453 ) 

454 

455 transfer_encoding = headers.get("transfer-encoding") 

456 

457 if transfer_encoding is None: 

458 content_length = _helpers.header_required( 

459 response, 

460 "content-length", 

461 self._get_headers, 

462 callback=self._make_invalid, 

463 ) 

464 num_bytes = int(content_length) 

465 if len(response_body) != num_bytes: 

466 self._make_invalid() 

467 raise InvalidResponse( 

468 response, 

469 "Response is different size than content-length", 

470 "Expected", 

471 num_bytes, 

472 "Received", 

473 len(response_body), 

474 ) 

475 else: 

476 # 'content-length' header not allowed with chunked encoding. 

477 num_bytes = end_byte - start_byte + 1 

478 

479 # First update ``bytes_downloaded``. 

480 self._bytes_downloaded += num_bytes 

481 # If the end byte is past ``end`` or ``total_bytes - 1`` we are done. 

482 if self.end is not None and end_byte >= self.end: 

483 self._finished = True 

484 elif end_byte >= total_bytes - 1: 

485 self._finished = True 

486 # NOTE: We only use ``total_bytes`` if not already known. 

487 if self.total_bytes is None: 

488 self._total_bytes = total_bytes 

489 # Write the response body to the stream. 

490 self._stream.write(response_body) 

491 

492 def consume_next_chunk(self, transport, timeout=None): 

493 """Consume the next chunk of the resource to be downloaded. 

494 

495 Args: 

496 transport (object): An object which can make authenticated 

497 requests. 

498 timeout (Optional[Union[float, Tuple[float, float]]]): 

499 The number of seconds to wait for the server response. 

500 Depending on the retry strategy, a request may be repeated 

501 several times using the same timeout each time. 

502 

503 Can also be passed as a tuple (connect_timeout, read_timeout). 

504 See :meth:`requests.Session.request` documentation for details. 

505 

506 Raises: 

507 NotImplementedError: Always, since virtual. 

508 """ 

509 raise NotImplementedError("This implementation is virtual.") 

510 

511 

512def add_bytes_range(start, end, headers): 

513 """Add a bytes range to a header dictionary. 

514 

515 Some possible inputs and the corresponding bytes ranges:: 

516 

517 >>> headers = {} 

518 >>> add_bytes_range(None, None, headers) 

519 >>> headers 

520 {} 

521 >>> add_bytes_range(500, 999, headers) 

522 >>> headers['range'] 

523 'bytes=500-999' 

524 >>> add_bytes_range(None, 499, headers) 

525 >>> headers['range'] 

526 'bytes=0-499' 

527 >>> add_bytes_range(-500, None, headers) 

528 >>> headers['range'] 

529 'bytes=-500' 

530 >>> add_bytes_range(9500, None, headers) 

531 >>> headers['range'] 

532 'bytes=9500-' 

533 

534 Args: 

535 start (Optional[int]): The first byte in a range. Can be zero, 

536 positive, negative or :data:`None`. 

537 end (Optional[int]): The last byte in a range. Assumed to be 

538 positive. 

539 headers (Mapping[str, str]): A headers mapping which can have the 

540 bytes range added if at least one of ``start`` or ``end`` 

541 is not :data:`None`. 

542 """ 

543 if start is None: 

544 if end is None: 

545 # No range to add. 

546 return 

547 else: 

548 # NOTE: This assumes ``end`` is non-negative. 

549 bytes_range = "0-{:d}".format(end) 

550 else: 

551 if end is None: 

552 if start < 0: 

553 bytes_range = "{:d}".format(start) 

554 else: 

555 bytes_range = "{:d}-".format(start) 

556 else: 

557 # NOTE: This is invalid if ``start < 0``. 

558 bytes_range = "{:d}-{:d}".format(start, end) 

559 

560 headers[_helpers.RANGE_HEADER] = "bytes=" + bytes_range 

561 

562 

563def get_range_info(response, get_headers, callback=_helpers.do_nothing): 

564 """Get the start, end and total bytes from a content range header. 

565 

566 Args: 

567 response (object): An HTTP response object. 

568 get_headers (Callable[Any, Mapping[str, str]]): Helper to get headers 

569 from an HTTP response. 

570 callback (Optional[Callable]): A callback that takes no arguments, 

571 to be executed when an exception is being raised. 

572 

573 Returns: 

574 Tuple[int, int, int]: The start byte, end byte and total bytes. 

575 

576 Raises: 

577 ~google.cloud.storage.exceptions.InvalidResponse: If the 

578 ``Content-Range`` header is not of the form 

579 ``bytes {start}-{end}/{total}``. 

580 """ 

581 content_range = _helpers.header_required( 

582 response, _helpers.CONTENT_RANGE_HEADER, get_headers, callback=callback 

583 ) 

584 match = _CONTENT_RANGE_RE.match(content_range) 

585 if match is None: 

586 callback() 

587 raise InvalidResponse( 

588 response, 

589 "Unexpected content-range header", 

590 content_range, 

591 'Expected to be of the form "bytes {start}-{end}/{total}"', 

592 ) 

593 

594 return ( 

595 int(match.group("start_byte")), 

596 int(match.group("end_byte")), 

597 int(match.group("total_bytes")), 

598 ) 

599 

600 

601def _check_for_zero_content_range(response, get_status_code, get_headers): 

602 """Validate if response status code is 416 and content range is zero. 

603 

604 This is the special case for handling zero bytes files. 

605 

606 Args: 

607 response (object): An HTTP response object. 

608 get_status_code (Callable[Any, int]): Helper to get a status code 

609 from a response. 

610 get_headers (Callable[Any, Mapping[str, str]]): Helper to get headers 

611 from an HTTP response. 

612 

613 Returns: 

614 bool: True if content range total bytes is zero, false otherwise. 

615 """ 

616 if get_status_code(response) == http.client.REQUESTED_RANGE_NOT_SATISFIABLE: 

617 content_range = _helpers.header_required( 

618 response, 

619 _helpers.CONTENT_RANGE_HEADER, 

620 get_headers, 

621 callback=_helpers.do_nothing, 

622 ) 

623 if content_range == _ZERO_CONTENT_RANGE_HEADER: 

624 return True 

625 return False