Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/resumable_media/_upload.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

365 statements  

1# Copyright 2017 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Virtual bases classes for uploading media via Google APIs. 

16 

17Supported here are: 

18 

19* simple (media) uploads 

20* multipart uploads that contain both metadata and a small file as payload 

21* resumable uploads (with metadata as well) 

22""" 

23 

24import http.client 

25import json 

26import os 

27import random 

28import re 

29import sys 

30import urllib.parse 

31 

32from google import resumable_media 

33from google.resumable_media import _helpers 

34from google.resumable_media import common 

35 

36from xml.etree import ElementTree 

37 

38 

39_CONTENT_TYPE_HEADER = "content-type" 

40_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}" 

41_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*" 

42_EMPTY_RANGE_TEMPLATE = "bytes */{:d}" 

43_BOUNDARY_WIDTH = len(str(sys.maxsize - 1)) 

44_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH) 

45_MULTIPART_SEP = b"--" 

46_CRLF = b"\r\n" 

47_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n" 

48_RELATED_HEADER = b'multipart/related; boundary="' 

49_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE) 

50_STREAM_ERROR_TEMPLATE = ( 

51 "Bytes stream is in unexpected state. " 

52 "The local stream has had {:d} bytes read from it while " 

53 "{:d} bytes have already been updated (they should match)." 

54) 

55_STREAM_READ_PAST_TEMPLATE = ( 

56 "{:d} bytes have been read from the stream, which exceeds the expected total {:d}." 

57) 

58_DELETE = "DELETE" 

59_POST = "POST" 

60_PUT = "PUT" 

61_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = ( 

62 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the " 

63 "remote host, ``{}``, did not match." 

64) 

65_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

66 "Response metadata had no ``{}`` value; checksum could not be validated." 

67) 

68_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

69 "Response headers had no ``{}`` value; checksum could not be validated." 

70) 

71_MPU_INITIATE_QUERY = "?uploads" 

72_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" 

73_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}" 

74_UPLOAD_ID_NODE = "UploadId" 

75_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" 

76 

77 

78class UploadBase(object): 

79 """Base class for upload helpers. 

80 

81 Defines core shared behavior across different upload types. 

82 

83 Args: 

84 upload_url (str): The URL where the content will be uploaded. 

85 headers (Optional[Mapping[str, str]]): Extra headers that should 

86 be sent with the request, e.g. headers for encrypted data. 

87 

88 Attributes: 

89 upload_url (str): The URL where the content will be uploaded. 

90 """ 

91 

92 def __init__(self, upload_url, headers=None): 

93 self.upload_url = upload_url 

94 if headers is None: 

95 headers = {} 

96 self._headers = headers 

97 self._finished = False 

98 self._retry_strategy = common.RetryStrategy() 

99 

100 @property 

101 def finished(self): 

102 """bool: Flag indicating if the upload has completed.""" 

103 return self._finished 

104 

105 def _process_response(self, response): 

106 """Process the response from an HTTP request. 

107 

108 This is everything that must be done after a request that doesn't 

109 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

110 philosophy. 

111 

112 Args: 

113 response (object): The HTTP response object. 

114 

115 Raises: 

116 ~google.resumable_media.common.InvalidResponse: If the status 

117 code is not 200. 

118 

119 .. _sans-I/O: https://sans-io.readthedocs.io/ 

120 """ 

121 # Tombstone the current upload so it cannot be used again (in either 

122 # failure or success). 

123 self._finished = True 

124 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

125 

126 @staticmethod 

127 def _get_status_code(response): 

128 """Access the status code from an HTTP response. 

129 

130 Args: 

131 response (object): The HTTP response object. 

132 

133 Raises: 

134 NotImplementedError: Always, since virtual. 

135 """ 

136 raise NotImplementedError("This implementation is virtual.") 

137 

138 @staticmethod 

139 def _get_headers(response): 

140 """Access the headers from an HTTP response. 

141 

142 Args: 

143 response (object): The HTTP response object. 

144 

145 Raises: 

146 NotImplementedError: Always, since virtual. 

147 """ 

148 raise NotImplementedError("This implementation is virtual.") 

149 

150 @staticmethod 

151 def _get_body(response): 

152 """Access the response body from an HTTP response. 

153 

154 Args: 

155 response (object): The HTTP response object. 

156 

157 Raises: 

158 NotImplementedError: Always, since virtual. 

159 """ 

160 raise NotImplementedError("This implementation is virtual.") 

161 

162 

163class SimpleUpload(UploadBase): 

164 """Upload a resource to a Google API. 

165 

166 A **simple** media upload sends no metadata and completes the upload 

167 in a single request. 

168 

169 Args: 

170 upload_url (str): The URL where the content will be uploaded. 

171 headers (Optional[Mapping[str, str]]): Extra headers that should 

172 be sent with the request, e.g. headers for encrypted data. 

173 

174 Attributes: 

175 upload_url (str): The URL where the content will be uploaded. 

176 """ 

177 

178 def _prepare_request(self, data, content_type): 

179 """Prepare the contents of an HTTP request. 

180 

181 This is everything that must be done before a request that doesn't 

182 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

183 philosophy. 

184 

185 .. note: 

186 

187 This method will be used only once, so ``headers`` will be 

188 mutated by having a new key added to it. 

189 

190 Args: 

191 data (bytes): The resource content to be uploaded. 

192 content_type (str): The content type for the request. 

193 

194 Returns: 

195 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

196 

197 * HTTP verb for the request (always POST) 

198 * the URL for the request 

199 * the body of the request 

200 * headers for the request 

201 

202 Raises: 

203 ValueError: If the current upload has already finished. 

204 TypeError: If ``data`` isn't bytes. 

205 

206 .. _sans-I/O: https://sans-io.readthedocs.io/ 

207 """ 

208 if self.finished: 

209 raise ValueError("An upload can only be used once.") 

210 

211 if not isinstance(data, bytes): 

212 raise TypeError("`data` must be bytes, received", type(data)) 

213 self._headers[_CONTENT_TYPE_HEADER] = content_type 

214 return _POST, self.upload_url, data, self._headers 

215 

216 def transmit(self, transport, data, content_type, timeout=None): 

217 """Transmit the resource to be uploaded. 

218 

219 Args: 

220 transport (object): An object which can make authenticated 

221 requests. 

222 data (bytes): The resource content to be uploaded. 

223 content_type (str): The content type of the resource, e.g. a JPEG 

224 image has content type ``image/jpeg``. 

225 timeout (Optional[Union[float, Tuple[float, float]]]): 

226 The number of seconds to wait for the server response. 

227 Depending on the retry strategy, a request may be repeated 

228 several times using the same timeout each time. 

229 

230 Can also be passed as a tuple (connect_timeout, read_timeout). 

231 See :meth:`requests.Session.request` documentation for details. 

232 

233 Raises: 

234 NotImplementedError: Always, since virtual. 

235 """ 

236 raise NotImplementedError("This implementation is virtual.") 

237 

238 

239class MultipartUpload(UploadBase): 

240 """Upload a resource with metadata to a Google API. 

241 

242 A **multipart** upload sends both metadata and the resource in a single 

243 (multipart) request. 

244 

245 Args: 

246 upload_url (str): The URL where the content will be uploaded. 

247 headers (Optional[Mapping[str, str]]): Extra headers that should 

248 be sent with the request, e.g. headers for encrypted data. 

249 checksum (Optional([str])): The type of checksum to compute to verify 

250 the integrity of the object. The request metadata will be amended 

251 to include the computed value. Using this option will override a 

252 manually-set checksum value. Supported values are "md5", "crc32c" 

253 and None. The default is None. 

254 

255 Attributes: 

256 upload_url (str): The URL where the content will be uploaded. 

257 """ 

258 

259 def __init__(self, upload_url, headers=None, checksum=None): 

260 super(MultipartUpload, self).__init__(upload_url, headers=headers) 

261 self._checksum_type = checksum 

262 

263 def _prepare_request(self, data, metadata, content_type): 

264 """Prepare the contents of an HTTP request. 

265 

266 This is everything that must be done before a request that doesn't 

267 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

268 philosophy. 

269 

270 .. note: 

271 

272 This method will be used only once, so ``headers`` will be 

273 mutated by having a new key added to it. 

274 

275 Args: 

276 data (bytes): The resource content to be uploaded. 

277 metadata (Mapping[str, str]): The resource metadata, such as an 

278 ACL list. 

279 content_type (str): The content type of the resource, e.g. a JPEG 

280 image has content type ``image/jpeg``. 

281 

282 Returns: 

283 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

284 

285 * HTTP verb for the request (always POST) 

286 * the URL for the request 

287 * the body of the request 

288 * headers for the request 

289 

290 Raises: 

291 ValueError: If the current upload has already finished. 

292 TypeError: If ``data`` isn't bytes. 

293 

294 .. _sans-I/O: https://sans-io.readthedocs.io/ 

295 """ 

296 if self.finished: 

297 raise ValueError("An upload can only be used once.") 

298 

299 if not isinstance(data, bytes): 

300 raise TypeError("`data` must be bytes, received", type(data)) 

301 

302 checksum_object = _helpers._get_checksum_object(self._checksum_type) 

303 if checksum_object is not None: 

304 checksum_object.update(data) 

305 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 

306 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

307 metadata[metadata_key] = actual_checksum 

308 

309 content, multipart_boundary = construct_multipart_request( 

310 data, metadata, content_type 

311 ) 

312 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"' 

313 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type 

314 

315 return _POST, self.upload_url, content, self._headers 

316 

317 def transmit(self, transport, data, metadata, content_type, timeout=None): 

318 """Transmit the resource to be uploaded. 

319 

320 Args: 

321 transport (object): An object which can make authenticated 

322 requests. 

323 data (bytes): The resource content to be uploaded. 

324 metadata (Mapping[str, str]): The resource metadata, such as an 

325 ACL list. 

326 content_type (str): The content type of the resource, e.g. a JPEG 

327 image has content type ``image/jpeg``. 

328 timeout (Optional[Union[float, Tuple[float, float]]]): 

329 The number of seconds to wait for the server response. 

330 Depending on the retry strategy, a request may be repeated 

331 several times using the same timeout each time. 

332 

333 Can also be passed as a tuple (connect_timeout, read_timeout). 

334 See :meth:`requests.Session.request` documentation for details. 

335 

336 Raises: 

337 NotImplementedError: Always, since virtual. 

338 """ 

339 raise NotImplementedError("This implementation is virtual.") 

340 

341 

342class ResumableUpload(UploadBase): 

343 """Initiate and fulfill a resumable upload to a Google API. 

344 

345 A **resumable** upload sends an initial request with the resource metadata 

346 and then gets assigned an upload ID / upload URL to send bytes to. 

347 Using the upload URL, the upload is then done in chunks (determined by 

348 the user) until all bytes have been uploaded. 

349 

350 Args: 

351 upload_url (str): The URL where the resumable upload will be initiated. 

352 chunk_size (int): The size of each chunk used to upload the resource. 

353 headers (Optional[Mapping[str, str]]): Extra headers that should 

354 be sent with every request. 

355 checksum (Optional([str])): The type of checksum to compute to verify 

356 the integrity of the object. After the upload is complete, the 

357 server-computed checksum of the resulting object will be read 

358 and google.resumable_media.common.DataCorruption will be raised on 

359 a mismatch. The corrupted file will not be deleted from the remote 

360 host automatically. Supported values are "md5", "crc32c" and None. 

361 The default is None. 

362 

363 Attributes: 

364 upload_url (str): The URL where the content will be uploaded. 

365 

366 Raises: 

367 ValueError: If ``chunk_size`` is not a multiple of 

368 :data:`.UPLOAD_CHUNK_SIZE`. 

369 """ 

370 

371 def __init__(self, upload_url, chunk_size, checksum=None, headers=None): 

372 super(ResumableUpload, self).__init__(upload_url, headers=headers) 

373 if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0: 

374 raise ValueError( 

375 "{} KB must divide chunk size".format( 

376 resumable_media.UPLOAD_CHUNK_SIZE / 1024 

377 ) 

378 ) 

379 self._chunk_size = chunk_size 

380 self._stream = None 

381 self._content_type = None 

382 self._bytes_uploaded = 0 

383 self._bytes_checksummed = 0 

384 self._checksum_type = checksum 

385 self._checksum_object = None 

386 self._total_bytes = None 

387 self._resumable_url = None 

388 self._invalid = False 

389 

390 @property 

391 def invalid(self): 

392 """bool: Indicates if the upload is in an invalid state. 

393 

394 This will occur if a call to :meth:`transmit_next_chunk` fails. 

395 To recover from such a failure, call :meth:`recover`. 

396 """ 

397 return self._invalid 

398 

399 @property 

400 def chunk_size(self): 

401 """int: The size of each chunk used to upload the resource.""" 

402 return self._chunk_size 

403 

404 @property 

405 def resumable_url(self): 

406 """Optional[str]: The URL of the in-progress resumable upload.""" 

407 return self._resumable_url 

408 

409 @property 

410 def bytes_uploaded(self): 

411 """int: Number of bytes that have been uploaded.""" 

412 return self._bytes_uploaded 

413 

414 @property 

415 def total_bytes(self): 

416 """Optional[int]: The total number of bytes to be uploaded. 

417 

418 If this upload is initiated (via :meth:`initiate`) with 

419 ``stream_final=True``, this value will be populated based on the size 

420 of the ``stream`` being uploaded. (By default ``stream_final=True``.) 

421 

422 If this upload is initiated with ``stream_final=False``, 

423 :attr:`total_bytes` will be :data:`None` since it cannot be 

424 determined from the stream. 

425 """ 

426 return self._total_bytes 

427 

428 def _prepare_initiate_request( 

429 self, stream, metadata, content_type, total_bytes=None, stream_final=True 

430 ): 

431 """Prepare the contents of HTTP request to initiate upload. 

432 

433 This is everything that must be done before a request that doesn't 

434 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

435 philosophy. 

436 

437 Args: 

438 stream (IO[bytes]): The stream (i.e. file-like object) that will 

439 be uploaded. The stream **must** be at the beginning (i.e. 

440 ``stream.tell() == 0``). 

441 metadata (Mapping[str, str]): The resource metadata, such as an 

442 ACL list. 

443 content_type (str): The content type of the resource, e.g. a JPEG 

444 image has content type ``image/jpeg``. 

445 total_bytes (Optional[int]): The total number of bytes to be 

446 uploaded. If specified, the upload size **will not** be 

447 determined from the stream (even if ``stream_final=True``). 

448 stream_final (Optional[bool]): Indicates if the ``stream`` is 

449 "final" (i.e. no more bytes will be added to it). In this case 

450 we determine the upload size from the size of the stream. If 

451 ``total_bytes`` is passed, this argument will be ignored. 

452 

453 Returns: 

454 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

455 

456 * HTTP verb for the request (always POST) 

457 * the URL for the request 

458 * the body of the request 

459 * headers for the request 

460 

461 Raises: 

462 ValueError: If the current upload has already been initiated. 

463 ValueError: If ``stream`` is not at the beginning. 

464 

465 .. _sans-I/O: https://sans-io.readthedocs.io/ 

466 """ 

467 if self.resumable_url is not None: 

468 raise ValueError("This upload has already been initiated.") 

469 if stream.tell() != 0: 

470 raise ValueError("Stream must be at beginning.") 

471 

472 self._stream = stream 

473 self._content_type = content_type 

474 

475 # Signed URL requires content type set directly - not through x-upload-content-type 

476 parse_result = urllib.parse.urlparse(self.upload_url) 

477 parsed_query = urllib.parse.parse_qs(parse_result.query) 

478 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query: 

479 # Deconstruct **self._headers first so that content type defined here takes priority 

480 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type} 

481 else: 

482 # Deconstruct **self._headers first so that content type defined here takes priority 

483 headers = { 

484 **self._headers, 

485 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8", 

486 "x-upload-content-type": content_type, 

487 } 

488 # Set the total bytes if possible. 

489 if total_bytes is not None: 

490 self._total_bytes = total_bytes 

491 elif stream_final: 

492 self._total_bytes = get_total_bytes(stream) 

493 # Add the total bytes to the headers if set. 

494 if self._total_bytes is not None: 

495 content_length = "{:d}".format(self._total_bytes) 

496 headers["x-upload-content-length"] = content_length 

497 

498 payload = json.dumps(metadata).encode("utf-8") 

499 return _POST, self.upload_url, payload, headers 

500 

501 def _process_initiate_response(self, response): 

502 """Process the response from an HTTP request that initiated upload. 

503 

504 This is everything that must be done after a request that doesn't 

505 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

506 philosophy. 

507 

508 This method takes the URL from the ``Location`` header and stores it 

509 for future use. Within that URL, we assume the ``upload_id`` query 

510 parameter has been included, but we do not check. 

511 

512 Args: 

513 response (object): The HTTP response object (need headers). 

514 

515 .. _sans-I/O: https://sans-io.readthedocs.io/ 

516 """ 

517 _helpers.require_status_code( 

518 response, 

519 (http.client.OK, http.client.CREATED), 

520 self._get_status_code, 

521 callback=self._make_invalid, 

522 ) 

523 self._resumable_url = _helpers.header_required( 

524 response, "location", self._get_headers 

525 ) 

526 

527 def initiate( 

528 self, 

529 transport, 

530 stream, 

531 metadata, 

532 content_type, 

533 total_bytes=None, 

534 stream_final=True, 

535 timeout=None, 

536 ): 

537 """Initiate a resumable upload. 

538 

539 By default, this method assumes your ``stream`` is in a "final" 

540 state ready to transmit. However, ``stream_final=False`` can be used 

541 to indicate that the size of the resource is not known. This can happen 

542 if bytes are being dynamically fed into ``stream``, e.g. if the stream 

543 is attached to application logs. 

544 

545 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 

546 read from the stream every time :meth:`transmit_next_chunk` is called. 

547 If one of those reads produces strictly fewer bites than the chunk 

548 size, the upload will be concluded. 

549 

550 Args: 

551 transport (object): An object which can make authenticated 

552 requests. 

553 stream (IO[bytes]): The stream (i.e. file-like object) that will 

554 be uploaded. The stream **must** be at the beginning (i.e. 

555 ``stream.tell() == 0``). 

556 metadata (Mapping[str, str]): The resource metadata, such as an 

557 ACL list. 

558 content_type (str): The content type of the resource, e.g. a JPEG 

559 image has content type ``image/jpeg``. 

560 total_bytes (Optional[int]): The total number of bytes to be 

561 uploaded. If specified, the upload size **will not** be 

562 determined from the stream (even if ``stream_final=True``). 

563 stream_final (Optional[bool]): Indicates if the ``stream`` is 

564 "final" (i.e. no more bytes will be added to it). In this case 

565 we determine the upload size from the size of the stream. If 

566 ``total_bytes`` is passed, this argument will be ignored. 

567 timeout (Optional[Union[float, Tuple[float, float]]]): 

568 The number of seconds to wait for the server response. 

569 Depending on the retry strategy, a request may be repeated 

570 several times using the same timeout each time. 

571 

572 Can also be passed as a tuple (connect_timeout, read_timeout). 

573 See :meth:`requests.Session.request` documentation for details. 

574 

575 Raises: 

576 NotImplementedError: Always, since virtual. 

577 """ 

578 raise NotImplementedError("This implementation is virtual.") 

579 

580 def _prepare_request(self): 

581 """Prepare the contents of HTTP request to upload a chunk. 

582 

583 This is everything that must be done before a request that doesn't 

584 require network I/O. This is based on the `sans-I/O`_ philosophy. 

585 

586 For the time being, this **does require** some form of I/O to read 

587 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this 

588 will (almost) certainly not be network I/O. 

589 

590 Returns: 

591 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

592 

593 * HTTP verb for the request (always PUT) 

594 * the URL for the request 

595 * the body of the request 

596 * headers for the request 

597 

598 The headers incorporate the ``_headers`` on the current instance. 

599 

600 Raises: 

601 ValueError: If the current upload has finished. 

602 ValueError: If the current upload is in an invalid state. 

603 ValueError: If the current upload has not been initiated. 

604 ValueError: If the location in the stream (i.e. ``stream.tell()``) 

605 does not agree with ``bytes_uploaded``. 

606 

607 .. _sans-I/O: https://sans-io.readthedocs.io/ 

608 """ 

609 if self.finished: 

610 raise ValueError("Upload has finished.") 

611 if self.invalid: 

612 raise ValueError( 

613 "Upload is in an invalid state. To recover call `recover()`." 

614 ) 

615 if self.resumable_url is None: 

616 raise ValueError( 

617 "This upload has not been initiated. Please call " 

618 "initiate() before beginning to transmit chunks." 

619 ) 

620 

621 start_byte, payload, content_range = get_next_chunk( 

622 self._stream, self._chunk_size, self._total_bytes 

623 ) 

624 if start_byte != self.bytes_uploaded: 

625 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded) 

626 raise ValueError(msg) 

627 

628 self._update_checksum(start_byte, payload) 

629 

630 headers = { 

631 **self._headers, 

632 _CONTENT_TYPE_HEADER: self._content_type, 

633 _helpers.CONTENT_RANGE_HEADER: content_range, 

634 } 

635 return _PUT, self.resumable_url, payload, headers 

636 

637 def _update_checksum(self, start_byte, payload): 

638 """Update the checksum with the payload if not already updated. 

639 

640 Because error recovery can result in bytes being transmitted more than 

641 once, the checksum tracks the number of bytes checked in 

642 self._bytes_checksummed and skips bytes that have already been summed. 

643 """ 

644 if not self._checksum_type: 

645 return 

646 

647 if not self._checksum_object: 

648 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

649 

650 if start_byte < self._bytes_checksummed: 

651 offset = self._bytes_checksummed - start_byte 

652 data = payload[offset:] 

653 else: 

654 data = payload 

655 

656 self._checksum_object.update(data) 

657 self._bytes_checksummed += len(data) 

658 

659 def _make_invalid(self): 

660 """Simple setter for ``invalid``. 

661 

662 This is intended to be passed along as a callback to helpers that 

663 raise an exception so they can mark this instance as invalid before 

664 raising. 

665 """ 

666 self._invalid = True 

667 

668 def _process_resumable_response(self, response, bytes_sent): 

669 """Process the response from an HTTP request. 

670 

671 This is everything that must be done after a request that doesn't 

672 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

673 philosophy. 

674 

675 Args: 

676 response (object): The HTTP response object. 

677 bytes_sent (int): The number of bytes sent in the request that 

678 ``response`` was returned for. 

679 

680 Raises: 

681 ~google.resumable_media.common.InvalidResponse: If the status 

682 code is 308 and the ``range`` header is not of the form 

683 ``bytes 0-{end}``. 

684 ~google.resumable_media.common.InvalidResponse: If the status 

685 code is not 200 or 308. 

686 

687 .. _sans-I/O: https://sans-io.readthedocs.io/ 

688 """ 

689 status_code = _helpers.require_status_code( 

690 response, 

691 (http.client.OK, http.client.PERMANENT_REDIRECT), 

692 self._get_status_code, 

693 callback=self._make_invalid, 

694 ) 

695 if status_code == http.client.OK: 

696 # NOTE: We use the "local" information of ``bytes_sent`` to update 

697 # ``bytes_uploaded``, but do not verify this against other 

698 # state. However, there may be some other information: 

699 # 

700 # * a ``size`` key in JSON response body 

701 # * the ``total_bytes`` attribute (if set) 

702 # * ``stream.tell()`` (relying on fact that ``initiate()`` 

703 # requires stream to be at the beginning) 

704 self._bytes_uploaded = self._bytes_uploaded + bytes_sent 

705 # Tombstone the current upload so it cannot be used again. 

706 self._finished = True 

707 # Validate the checksum. This can raise an exception on failure. 

708 self._validate_checksum(response) 

709 else: 

710 bytes_range = _helpers.header_required( 

711 response, 

712 _helpers.RANGE_HEADER, 

713 self._get_headers, 

714 callback=self._make_invalid, 

715 ) 

716 match = _BYTES_RANGE_RE.match(bytes_range) 

717 if match is None: 

718 self._make_invalid() 

719 raise common.InvalidResponse( 

720 response, 

721 'Unexpected "range" header', 

722 bytes_range, 

723 'Expected to be of the form "bytes=0-{end}"', 

724 ) 

725 self._bytes_uploaded = int(match.group("end_byte")) + 1 

726 

727 def _validate_checksum(self, response): 

728 """Check the computed checksum, if any, against the recieved metadata. 

729 

730 Args: 

731 response (object): The HTTP response object. 

732 

733 Raises: 

734 ~google.resumable_media.common.DataCorruption: If the checksum 

735 computed locally and the checksum reported by the remote host do 

736 not match. 

737 """ 

738 if self._checksum_type is None: 

739 return 

740 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

741 metadata = response.json() 

742 remote_checksum = metadata.get(metadata_key) 

743 if remote_checksum is None: 

744 raise common.InvalidResponse( 

745 response, 

746 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

747 self._get_headers(response), 

748 ) 

749 local_checksum = _helpers.prepare_checksum_digest( 

750 self._checksum_object.digest() 

751 ) 

752 if local_checksum != remote_checksum: 

753 raise common.DataCorruption( 

754 response, 

755 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

756 self._checksum_type.upper(), local_checksum, remote_checksum 

757 ), 

758 ) 

759 

760 def transmit_next_chunk(self, transport, timeout=None): 

761 """Transmit the next chunk of the resource to be uploaded. 

762 

763 If the current upload was initiated with ``stream_final=False``, 

764 this method will dynamically determine if the upload has completed. 

765 The upload will be considered complete if the stream produces 

766 fewer than :attr:`chunk_size` bytes when a chunk is read from it. 

767 

768 Args: 

769 transport (object): An object which can make authenticated 

770 requests. 

771 timeout (Optional[Union[float, Tuple[float, float]]]): 

772 The number of seconds to wait for the server response. 

773 Depending on the retry strategy, a request may be repeated 

774 several times using the same timeout each time. 

775 

776 Can also be passed as a tuple (connect_timeout, read_timeout). 

777 See :meth:`requests.Session.request` documentation for details. 

778 

779 Raises: 

780 NotImplementedError: Always, since virtual. 

781 """ 

782 raise NotImplementedError("This implementation is virtual.") 

783 

784 def _prepare_recover_request(self): 

785 """Prepare the contents of HTTP request to recover from failure. 

786 

787 This is everything that must be done before a request that doesn't 

788 require network I/O. This is based on the `sans-I/O`_ philosophy. 

789 

790 We assume that the :attr:`resumable_url` is set (i.e. the only way 

791 the upload can end up :attr:`invalid` is if it has been initiated. 

792 

793 Returns: 

794 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

795 

796 * HTTP verb for the request (always PUT) 

797 * the URL for the request 

798 * the body of the request (always :data:`None`) 

799 * headers for the request 

800 

801 The headers **do not** incorporate the ``_headers`` on the 

802 current instance. 

803 

804 .. _sans-I/O: https://sans-io.readthedocs.io/ 

805 """ 

806 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"} 

807 return _PUT, self.resumable_url, None, headers 

808 

809 def _process_recover_response(self, response): 

810 """Process the response from an HTTP request to recover from failure. 

811 

812 This is everything that must be done after a request that doesn't 

813 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

814 philosophy. 

815 

816 Args: 

817 response (object): The HTTP response object. 

818 

819 Raises: 

820 ~google.resumable_media.common.InvalidResponse: If the status 

821 code is not 308. 

822 ~google.resumable_media.common.InvalidResponse: If the status 

823 code is 308 and the ``range`` header is not of the form 

824 ``bytes 0-{end}``. 

825 

826 .. _sans-I/O: https://sans-io.readthedocs.io/ 

827 """ 

828 _helpers.require_status_code( 

829 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code 

830 ) 

831 headers = self._get_headers(response) 

832 if _helpers.RANGE_HEADER in headers: 

833 bytes_range = headers[_helpers.RANGE_HEADER] 

834 match = _BYTES_RANGE_RE.match(bytes_range) 

835 if match is None: 

836 raise common.InvalidResponse( 

837 response, 

838 'Unexpected "range" header', 

839 bytes_range, 

840 'Expected to be of the form "bytes=0-{end}"', 

841 ) 

842 self._bytes_uploaded = int(match.group("end_byte")) + 1 

843 else: 

844 # In this case, the upload has not "begun". 

845 self._bytes_uploaded = 0 

846 

847 self._stream.seek(self._bytes_uploaded) 

848 self._invalid = False 

849 

850 def recover(self, transport): 

851 """Recover from a failure. 

852 

853 This method should be used when a :class:`ResumableUpload` is in an 

854 :attr:`~ResumableUpload.invalid` state due to a request failure. 

855 

856 This will verify the progress with the server and make sure the 

857 current upload is in a valid state before :meth:`transmit_next_chunk` 

858 can be used again. 

859 

860 Args: 

861 transport (object): An object which can make authenticated 

862 requests. 

863 

864 Raises: 

865 NotImplementedError: Always, since virtual. 

866 """ 

867 raise NotImplementedError("This implementation is virtual.") 

868 

869 

870class XMLMPUContainer(UploadBase): 

871 """Initiate and close an upload using the XML MPU API. 

872 

873 An XML MPU sends an initial request and then receives an upload ID. 

874 Using the upload ID, the upload is then done in numbered parts and the 

875 parts can be uploaded concurrently. 

876 

877 In order to avoid concurrency issues with this container object, the 

878 uploading of individual parts is handled separately, by XMLMPUPart objects 

879 spawned from this container class. The XMLMPUPart objects are not 

880 necessarily in the same process as the container, so they do not update the 

881 container automatically. 

882 

883 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

884 given the JSON multipart upload, so the abbreviation "MPU" will be used 

885 throughout. 

886 

887 See: https://cloud.google.com/storage/docs/multipart-uploads 

888 

889 Args: 

890 upload_url (str): The URL of the object (without query parameters). The 

891 initiate, PUT, and finalization requests will all use this URL, with 

892 varying query parameters. 

893 filename (str): The name (path) of the file to upload. 

894 headers (Optional[Mapping[str, str]]): Extra headers that should 

895 be sent with every request. 

896 

897 Attributes: 

898 upload_url (str): The URL where the content will be uploaded. 

899 upload_id (Optional(str)): The ID of the upload from the initialization 

900 response. 

901 """ 

902 

903 def __init__(self, upload_url, filename, headers=None, upload_id=None): 

904 super().__init__(upload_url, headers=headers) 

905 self._filename = filename 

906 self._upload_id = upload_id 

907 self._parts = {} 

908 

909 @property 

910 def upload_id(self): 

911 return self._upload_id 

912 

913 def register_part(self, part_number, etag): 

914 """Register an uploaded part by part number and corresponding etag. 

915 

916 XMLMPUPart objects represent individual parts, and their part number 

917 and etag can be registered to the container object with this method 

918 and therefore incorporated in the finalize() call to finish the upload. 

919 

920 This method accepts part_number and etag, but not XMLMPUPart objects 

921 themselves, to reduce the complexity involved in running XMLMPUPart 

922 uploads in separate processes. 

923 

924 Args: 

925 part_number (int): The part number. Parts are assembled into the 

926 final uploaded object with finalize() in order of their part 

927 numbers. 

928 etag (str): The etag included in the server response after upload. 

929 """ 

930 self._parts[part_number] = etag 

931 

932 def _prepare_initiate_request(self, content_type): 

933 """Prepare the contents of HTTP request to initiate upload. 

934 

935 This is everything that must be done before a request that doesn't 

936 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

937 philosophy. 

938 

939 Args: 

940 content_type (str): The content type of the resource, e.g. a JPEG 

941 image has content type ``image/jpeg``. 

942 

943 Returns: 

944 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

945 

946 * HTTP verb for the request (always POST) 

947 * the URL for the request 

948 * the body of the request 

949 * headers for the request 

950 

951 Raises: 

952 ValueError: If the current upload has already been initiated. 

953 

954 .. _sans-I/O: https://sans-io.readthedocs.io/ 

955 """ 

956 if self.upload_id is not None: 

957 raise ValueError("This upload has already been initiated.") 

958 

959 initiate_url = self.upload_url + _MPU_INITIATE_QUERY 

960 

961 headers = { 

962 **self._headers, 

963 _CONTENT_TYPE_HEADER: content_type, 

964 } 

965 return _POST, initiate_url, None, headers 

966 

967 def _process_initiate_response(self, response): 

968 """Process the response from an HTTP request that initiated the upload. 

969 

970 This is everything that must be done after a request that doesn't 

971 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

972 philosophy. 

973 

974 This method takes the URL from the ``Location`` header and stores it 

975 for future use. Within that URL, we assume the ``upload_id`` query 

976 parameter has been included, but we do not check. 

977 

978 Args: 

979 response (object): The HTTP response object. 

980 

981 Raises: 

982 ~google.resumable_media.common.InvalidResponse: If the status 

983 code is not 200. 

984 

985 .. _sans-I/O: https://sans-io.readthedocs.io/ 

986 """ 

987 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

988 root = ElementTree.fromstring(response.text) 

989 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text 

990 

991 def initiate( 

992 self, 

993 transport, 

994 content_type, 

995 timeout=None, 

996 ): 

997 """Initiate an MPU and record the upload ID. 

998 

999 Args: 

1000 transport (object): An object which can make authenticated 

1001 requests. 

1002 content_type (str): The content type of the resource, e.g. a JPEG 

1003 image has content type ``image/jpeg``. 

1004 timeout (Optional[Union[float, Tuple[float, float]]]): 

1005 The number of seconds to wait for the server response. 

1006 Depending on the retry strategy, a request may be repeated 

1007 several times using the same timeout each time. 

1008 

1009 Can also be passed as a tuple (connect_timeout, read_timeout). 

1010 See :meth:`requests.Session.request` documentation for details. 

1011 

1012 Raises: 

1013 NotImplementedError: Always, since virtual. 

1014 """ 

1015 raise NotImplementedError("This implementation is virtual.") 

1016 

1017 def _prepare_finalize_request(self): 

1018 """Prepare the contents of an HTTP request to finalize the upload. 

1019 

1020 All of the parts must be registered before calling this method. 

1021 

1022 Returns: 

1023 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1024 

1025 * HTTP verb for the request (always POST) 

1026 * the URL for the request 

1027 * the body of the request 

1028 * headers for the request 

1029 

1030 Raises: 

1031 ValueError: If the upload has not been initiated. 

1032 """ 

1033 if self.upload_id is None: 

1034 raise ValueError("This upload has not yet been initiated.") 

1035 

1036 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1037 finalize_url = self.upload_url + final_query 

1038 final_xml_root = ElementTree.Element("CompleteMultipartUpload") 

1039 for part_number, etag in self._parts.items(): 

1040 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop 

1041 ElementTree.SubElement(part, "PartNumber").text = str(part_number) 

1042 ElementTree.SubElement(part, "ETag").text = etag 

1043 payload = ElementTree.tostring(final_xml_root) 

1044 return _POST, finalize_url, payload, self._headers 

1045 

1046 def _process_finalize_response(self, response): 

1047 """Process the response from an HTTP request that finalized the upload. 

1048 

1049 This is everything that must be done after a request that doesn't 

1050 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1051 philosophy. 

1052 

1053 Args: 

1054 response (object): The HTTP response object. 

1055 

1056 Raises: 

1057 ~google.resumable_media.common.InvalidResponse: If the status 

1058 code is not 200. 

1059 

1060 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1061 """ 

1062 

1063 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

1064 self._finished = True 

1065 

1066 def finalize( 

1067 self, 

1068 transport, 

1069 timeout=None, 

1070 ): 

1071 """Finalize an MPU request with all the parts. 

1072 

1073 Args: 

1074 transport (object): An object which can make authenticated 

1075 requests. 

1076 timeout (Optional[Union[float, Tuple[float, float]]]): 

1077 The number of seconds to wait for the server response. 

1078 Depending on the retry strategy, a request may be repeated 

1079 several times using the same timeout each time. 

1080 

1081 Can also be passed as a tuple (connect_timeout, read_timeout). 

1082 See :meth:`requests.Session.request` documentation for details. 

1083 

1084 Raises: 

1085 NotImplementedError: Always, since virtual. 

1086 """ 

1087 raise NotImplementedError("This implementation is virtual.") 

1088 

1089 def _prepare_cancel_request(self): 

1090 """Prepare the contents of an HTTP request to cancel the upload. 

1091 

1092 Returns: 

1093 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1094 

1095 * HTTP verb for the request (always DELETE) 

1096 * the URL for the request 

1097 * the body of the request 

1098 * headers for the request 

1099 

1100 Raises: 

1101 ValueError: If the upload has not been initiated. 

1102 """ 

1103 if self.upload_id is None: 

1104 raise ValueError("This upload has not yet been initiated.") 

1105 

1106 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1107 cancel_url = self.upload_url + cancel_query 

1108 return _DELETE, cancel_url, None, self._headers 

1109 

1110 def _process_cancel_response(self, response): 

1111 """Process the response from an HTTP request that canceled the upload. 

1112 

1113 This is everything that must be done after a request that doesn't 

1114 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1115 philosophy. 

1116 

1117 Args: 

1118 response (object): The HTTP response object. 

1119 

1120 Raises: 

1121 ~google.resumable_media.common.InvalidResponse: If the status 

1122 code is not 204. 

1123 

1124 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1125 """ 

1126 

1127 _helpers.require_status_code( 

1128 response, (http.client.NO_CONTENT,), self._get_status_code 

1129 ) 

1130 

1131 def cancel( 

1132 self, 

1133 transport, 

1134 timeout=None, 

1135 ): 

1136 """Cancel an MPU request and permanently delete any uploaded parts. 

1137 

1138 This cannot be undone. 

1139 

1140 Args: 

1141 transport (object): An object which can make authenticated 

1142 requests. 

1143 timeout (Optional[Union[float, Tuple[float, float]]]): 

1144 The number of seconds to wait for the server response. 

1145 Depending on the retry strategy, a request may be repeated 

1146 several times using the same timeout each time. 

1147 

1148 Can also be passed as a tuple (connect_timeout, read_timeout). 

1149 See :meth:`requests.Session.request` documentation for details. 

1150 

1151 Raises: 

1152 NotImplementedError: Always, since virtual. 

1153 """ 

1154 raise NotImplementedError("This implementation is virtual.") 

1155 

1156 

1157class XMLMPUPart(UploadBase): 

1158 """Upload a single part of an existing XML MPU container. 

1159 

1160 An XML MPU sends an initial request and then receives an upload ID. 

1161 Using the upload ID, the upload is then done in numbered parts and the 

1162 parts can be uploaded concurrently. 

1163 

1164 In order to avoid concurrency issues with the container object, the 

1165 uploading of individual parts is handled separately by multiple objects 

1166 of this class. Once a part is uploaded, it can be registered with the 

1167 container with `container.register_part(part.part_number, part.etag)`. 

1168 

1169 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

1170 given the JSON multipart upload, so the abbreviation "MPU" will be used 

1171 throughout. 

1172 

1173 See: https://cloud.google.com/storage/docs/multipart-uploads 

1174 

1175 Args: 

1176 upload_url (str): The URL of the object (without query parameters). 

1177 upload_id (str): The ID of the upload from the initialization response. 

1178 filename (str): The name (path) of the file to upload. 

1179 start (int): The byte index of the beginning of the part. 

1180 end (int): The byte index of the end of the part. 

1181 part_number (int): The part number. Part numbers will be assembled in 

1182 sequential order when the container is finalized. 

1183 headers (Optional[Mapping[str, str]]): Extra headers that should 

1184 be sent with every request. 

1185 checksum (Optional([str])): The type of checksum to compute to verify 

1186 the integrity of the object. The request headers will be amended 

1187 to include the computed value. Supported values are "md5", "crc32c" 

1188 and None. The default is None. 

1189 

1190 Attributes: 

1191 upload_url (str): The URL of the object (without query parameters). 

1192 upload_id (str): The ID of the upload from the initialization response. 

1193 filename (str): The name (path) of the file to upload. 

1194 start (int): The byte index of the beginning of the part. 

1195 end (int): The byte index of the end of the part. 

1196 part_number (int): The part number. Part numbers will be assembled in 

1197 sequential order when the container is finalized. 

1198 etag (Optional(str)): The etag returned by the service after upload. 

1199 """ 

1200 

1201 def __init__( 

1202 self, 

1203 upload_url, 

1204 upload_id, 

1205 filename, 

1206 start, 

1207 end, 

1208 part_number, 

1209 headers=None, 

1210 checksum=None, 

1211 ): 

1212 super().__init__(upload_url, headers=headers) 

1213 self._filename = filename 

1214 self._start = start 

1215 self._end = end 

1216 self._upload_id = upload_id 

1217 self._part_number = part_number 

1218 self._etag = None 

1219 self._checksum_type = checksum 

1220 self._checksum_object = None 

1221 

1222 @property 

1223 def part_number(self): 

1224 return self._part_number 

1225 

1226 @property 

1227 def upload_id(self): 

1228 return self._upload_id 

1229 

1230 @property 

1231 def filename(self): 

1232 return self._filename 

1233 

1234 @property 

1235 def etag(self): 

1236 return self._etag 

1237 

1238 @property 

1239 def start(self): 

1240 return self._start 

1241 

1242 @property 

1243 def end(self): 

1244 return self._end 

1245 

1246 def _prepare_upload_request(self): 

1247 """Prepare the contents of HTTP request to upload a part. 

1248 

1249 This is everything that must be done before a request that doesn't 

1250 require network I/O. This is based on the `sans-I/O`_ philosophy. 

1251 

1252 For the time being, this **does require** some form of I/O to read 

1253 a part from ``stream`` (via :func:`get_part_payload`). However, this 

1254 will (almost) certainly not be network I/O. 

1255 

1256 Returns: 

1257 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1258 

1259 * HTTP verb for the request (always PUT) 

1260 * the URL for the request 

1261 * the body of the request 

1262 * headers for the request 

1263 

1264 The headers incorporate the ``_headers`` on the current instance. 

1265 

1266 Raises: 

1267 ValueError: If the current upload has finished. 

1268 

1269 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1270 """ 

1271 if self.finished: 

1272 raise ValueError("This part has already been uploaded.") 

1273 

1274 with open(self._filename, "br") as f: 

1275 f.seek(self._start) 

1276 payload = f.read(self._end - self._start) 

1277 

1278 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

1279 if self._checksum_object is not None: 

1280 self._checksum_object.update(payload) 

1281 

1282 part_query = _MPU_PART_QUERY_TEMPLATE.format( 

1283 part=self._part_number, upload_id=self._upload_id 

1284 ) 

1285 upload_url = self.upload_url + part_query 

1286 return _PUT, upload_url, payload, self._headers 

1287 

1288 def _process_upload_response(self, response): 

1289 """Process the response from an HTTP request. 

1290 

1291 This is everything that must be done after a request that doesn't 

1292 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1293 philosophy. 

1294 

1295 Args: 

1296 response (object): The HTTP response object. 

1297 

1298 Raises: 

1299 ~google.resumable_media.common.InvalidResponse: If the status 

1300 code is not 200 or the response is missing data. 

1301 

1302 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1303 """ 

1304 _helpers.require_status_code( 

1305 response, 

1306 (http.client.OK,), 

1307 self._get_status_code, 

1308 ) 

1309 

1310 self._validate_checksum(response) 

1311 

1312 etag = _helpers.header_required(response, "etag", self._get_headers) 

1313 self._etag = etag 

1314 self._finished = True 

1315 

1316 def upload( 

1317 self, 

1318 transport, 

1319 timeout=None, 

1320 ): 

1321 """Upload the part. 

1322 

1323 Args: 

1324 transport (object): An object which can make authenticated 

1325 requests. 

1326 timeout (Optional[Union[float, Tuple[float, float]]]): 

1327 The number of seconds to wait for the server response. 

1328 Depending on the retry strategy, a request may be repeated 

1329 several times using the same timeout each time. 

1330 

1331 Can also be passed as a tuple (connect_timeout, read_timeout). 

1332 See :meth:`requests.Session.request` documentation for details. 

1333 

1334 Raises: 

1335 NotImplementedError: Always, since virtual. 

1336 """ 

1337 raise NotImplementedError("This implementation is virtual.") 

1338 

1339 def _validate_checksum(self, response): 

1340 """Check the computed checksum, if any, against the response headers. 

1341 

1342 Args: 

1343 response (object): The HTTP response object. 

1344 

1345 Raises: 

1346 ~google.resumable_media.common.DataCorruption: If the checksum 

1347 computed locally and the checksum reported by the remote host do 

1348 not match. 

1349 """ 

1350 if self._checksum_type is None: 

1351 return 

1352 

1353 remote_checksum = _helpers._get_uploaded_checksum_from_headers( 

1354 response, self._get_headers, self._checksum_type 

1355 ) 

1356 

1357 if remote_checksum is None: 

1358 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

1359 raise common.InvalidResponse( 

1360 response, 

1361 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

1362 self._get_headers(response), 

1363 ) 

1364 local_checksum = _helpers.prepare_checksum_digest( 

1365 self._checksum_object.digest() 

1366 ) 

1367 if local_checksum != remote_checksum: 

1368 raise common.DataCorruption( 

1369 response, 

1370 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

1371 self._checksum_type.upper(), local_checksum, remote_checksum 

1372 ), 

1373 ) 

1374 

1375 

1376def get_boundary(): 

1377 """Get a random boundary for a multipart request. 

1378 

1379 Returns: 

1380 bytes: The boundary used to separate parts of a multipart request. 

1381 """ 

1382 random_int = random.randrange(sys.maxsize) 

1383 boundary = _BOUNDARY_FORMAT.format(random_int) 

1384 # NOTE: Neither % formatting nor .format() are available for byte strings 

1385 # in Python 3.4, so we must use unicode strings as templates. 

1386 return boundary.encode("utf-8") 

1387 

1388 

1389def construct_multipart_request(data, metadata, content_type): 

1390 """Construct a multipart request body. 

1391 

1392 Args: 

1393 data (bytes): The resource content (UTF-8 encoded as bytes) 

1394 to be uploaded. 

1395 metadata (Mapping[str, str]): The resource metadata, such as an 

1396 ACL list. 

1397 content_type (str): The content type of the resource, e.g. a JPEG 

1398 image has content type ``image/jpeg``. 

1399 

1400 Returns: 

1401 Tuple[bytes, bytes]: The multipart request body and the boundary used 

1402 between each part. 

1403 """ 

1404 multipart_boundary = get_boundary() 

1405 json_bytes = json.dumps(metadata).encode("utf-8") 

1406 content_type = content_type.encode("utf-8") 

1407 # Combine the two parts into a multipart payload. 

1408 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4. 

1409 boundary_sep = _MULTIPART_SEP + multipart_boundary 

1410 content = ( 

1411 boundary_sep 

1412 + _MULTIPART_BEGIN 

1413 + json_bytes 

1414 + _CRLF 

1415 + boundary_sep 

1416 + _CRLF 

1417 + b"content-type: " 

1418 + content_type 

1419 + _CRLF 

1420 + _CRLF 

1421 + data # Empty line between headers and body. 

1422 + _CRLF 

1423 + boundary_sep 

1424 + _MULTIPART_SEP 

1425 ) 

1426 

1427 return content, multipart_boundary 

1428 

1429 

1430def get_total_bytes(stream): 

1431 """Determine the total number of bytes in a stream. 

1432 

1433 Args: 

1434 stream (IO[bytes]): The stream (i.e. file-like object). 

1435 

1436 Returns: 

1437 int: The number of bytes. 

1438 """ 

1439 current_position = stream.tell() 

1440 # NOTE: ``.seek()`` **should** return the same value that ``.tell()`` 

1441 # returns, but in Python 2, ``file`` objects do not. 

1442 stream.seek(0, os.SEEK_END) 

1443 end_position = stream.tell() 

1444 # Go back to the initial position. 

1445 stream.seek(current_position) 

1446 

1447 return end_position 

1448 

1449 

1450def get_next_chunk(stream, chunk_size, total_bytes): 

1451 """Get a chunk from an I/O stream. 

1452 

1453 The ``stream`` may have fewer bytes remaining than ``chunk_size`` 

1454 so it may not always be the case that 

1455 ``end_byte == start_byte + chunk_size - 1``. 

1456 

1457 Args: 

1458 stream (IO[bytes]): The stream (i.e. file-like object). 

1459 chunk_size (int): The size of the chunk to be read from the ``stream``. 

1460 total_bytes (Optional[int]): The (expected) total number of bytes 

1461 in the ``stream``. 

1462 

1463 Returns: 

1464 Tuple[int, bytes, str]: Triple of: 

1465 

1466 * the start byte index 

1467 * the content in between the start and end bytes (inclusive) 

1468 * content range header for the chunk (slice) that has been read 

1469 

1470 Raises: 

1471 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields 

1472 non-empty content. 

1473 ValueError: If there is no data left to consume. This corresponds 

1474 exactly to the case ``end_byte < start_byte``, which can only 

1475 occur if ``end_byte == start_byte - 1``. 

1476 """ 

1477 start_byte = stream.tell() 

1478 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0: 

1479 payload = stream.read(total_bytes - start_byte) 

1480 else: 

1481 payload = stream.read(chunk_size) 

1482 end_byte = stream.tell() - 1 

1483 

1484 num_bytes_read = len(payload) 

1485 if total_bytes is None: 

1486 if num_bytes_read < chunk_size: 

1487 # We now **KNOW** the total number of bytes. 

1488 total_bytes = end_byte + 1 

1489 elif total_bytes == 0: 

1490 # NOTE: We also expect ``start_byte == 0`` here but don't check 

1491 # because ``_prepare_initiate_request()`` requires the 

1492 # stream to be at the beginning. 

1493 if num_bytes_read != 0: 

1494 raise ValueError( 

1495 "Stream specified as empty, but produced non-empty content." 

1496 ) 

1497 else: 

1498 if num_bytes_read == 0: 

1499 raise ValueError( 

1500 "Stream is already exhausted. There is no content remaining." 

1501 ) 

1502 

1503 content_range = get_content_range(start_byte, end_byte, total_bytes) 

1504 return start_byte, payload, content_range 

1505 

1506 

1507def get_content_range(start_byte, end_byte, total_bytes): 

1508 """Convert start, end and total into content range header. 

1509 

1510 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*". 

1511 If we are dealing with an empty range (i.e. ``end_byte < start_byte``) 

1512 then "bytes */{total}" is used. 

1513 

1514 This function **ASSUMES** that if the size is not known, the caller will 

1515 not also pass an empty range. 

1516 

1517 Args: 

1518 start_byte (int): The start (inclusive) of the byte range. 

1519 end_byte (int): The end (inclusive) of the byte range. 

1520 total_bytes (Optional[int]): The number of bytes in the byte 

1521 range (if known). 

1522 

1523 Returns: 

1524 str: The content range header. 

1525 """ 

1526 if total_bytes is None: 

1527 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte) 

1528 elif end_byte < start_byte: 

1529 return _EMPTY_RANGE_TEMPLATE.format(total_bytes) 

1530 else: 

1531 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)