Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/resumable_media/_upload.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

364 statements  

1# Copyright 2017 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Virtual bases classes for uploading media via Google APIs. 

16 

17Supported here are: 

18 

19* simple (media) uploads 

20* multipart uploads that contain both metadata and a small file as payload 

21* resumable uploads (with metadata as well) 

22""" 

23 

24import http.client 

25import json 

26import os 

27import random 

28import re 

29import sys 

30import urllib.parse 

31 

32from google import resumable_media 

33from google.resumable_media import _helpers 

34from google.resumable_media import common 

35 

36from xml.etree import ElementTree 

37 

38 

39_CONTENT_TYPE_HEADER = "content-type" 

40_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}" 

41_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*" 

42_EMPTY_RANGE_TEMPLATE = "bytes */{:d}" 

43_BOUNDARY_WIDTH = len(str(sys.maxsize - 1)) 

44_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH) 

45_MULTIPART_SEP = b"--" 

46_CRLF = b"\r\n" 

47_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n" 

48_RELATED_HEADER = b'multipart/related; boundary="' 

49_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE) 

50_STREAM_ERROR_TEMPLATE = ( 

51 "Bytes stream is in unexpected state. " 

52 "The local stream has had {:d} bytes read from it while " 

53 "{:d} bytes have already been updated (they should match)." 

54) 

55_STREAM_READ_PAST_TEMPLATE = ( 

56 "{:d} bytes have been read from the stream, which exceeds " 

57 "the expected total {:d}." 

58) 

59_DELETE = "DELETE" 

60_POST = "POST" 

61_PUT = "PUT" 

62_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = ( 

63 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the " 

64 "remote host, ``{}``, did not match." 

65) 

66_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

67 "Response metadata had no ``{}`` value; checksum could not be validated." 

68) 

69_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

70 "Response headers had no ``{}`` value; checksum could not be validated." 

71) 

72_MPU_INITIATE_QUERY = "?uploads" 

73_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" 

74_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}" 

75_UPLOAD_ID_NODE = "UploadId" 

76_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" 

77 

78 

79class UploadBase(object): 

80 """Base class for upload helpers. 

81 

82 Defines core shared behavior across different upload types. 

83 

84 Args: 

85 upload_url (str): The URL where the content will be uploaded. 

86 headers (Optional[Mapping[str, str]]): Extra headers that should 

87 be sent with the request, e.g. headers for encrypted data. 

88 

89 Attributes: 

90 upload_url (str): The URL where the content will be uploaded. 

91 """ 

92 

93 def __init__(self, upload_url, headers=None): 

94 self.upload_url = upload_url 

95 if headers is None: 

96 headers = {} 

97 self._headers = headers 

98 self._finished = False 

99 self._retry_strategy = common.RetryStrategy() 

100 

101 @property 

102 def finished(self): 

103 """bool: Flag indicating if the upload has completed.""" 

104 return self._finished 

105 

106 def _process_response(self, response): 

107 """Process the response from an HTTP request. 

108 

109 This is everything that must be done after a request that doesn't 

110 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

111 philosophy. 

112 

113 Args: 

114 response (object): The HTTP response object. 

115 

116 Raises: 

117 ~google.resumable_media.common.InvalidResponse: If the status 

118 code is not 200. 

119 

120 .. _sans-I/O: https://sans-io.readthedocs.io/ 

121 """ 

122 # Tombstone the current upload so it cannot be used again (in either 

123 # failure or success). 

124 self._finished = True 

125 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

126 

127 @staticmethod 

128 def _get_status_code(response): 

129 """Access the status code from an HTTP response. 

130 

131 Args: 

132 response (object): The HTTP response object. 

133 

134 Raises: 

135 NotImplementedError: Always, since virtual. 

136 """ 

137 raise NotImplementedError("This implementation is virtual.") 

138 

139 @staticmethod 

140 def _get_headers(response): 

141 """Access the headers from an HTTP response. 

142 

143 Args: 

144 response (object): The HTTP response object. 

145 

146 Raises: 

147 NotImplementedError: Always, since virtual. 

148 """ 

149 raise NotImplementedError("This implementation is virtual.") 

150 

151 @staticmethod 

152 def _get_body(response): 

153 """Access the response body from an HTTP response. 

154 

155 Args: 

156 response (object): The HTTP response object. 

157 

158 Raises: 

159 NotImplementedError: Always, since virtual. 

160 """ 

161 raise NotImplementedError("This implementation is virtual.") 

162 

163 

164class SimpleUpload(UploadBase): 

165 """Upload a resource to a Google API. 

166 

167 A **simple** media upload sends no metadata and completes the upload 

168 in a single request. 

169 

170 Args: 

171 upload_url (str): The URL where the content will be uploaded. 

172 headers (Optional[Mapping[str, str]]): Extra headers that should 

173 be sent with the request, e.g. headers for encrypted data. 

174 

175 Attributes: 

176 upload_url (str): The URL where the content will be uploaded. 

177 """ 

178 

179 def _prepare_request(self, data, content_type): 

180 """Prepare the contents of an HTTP request. 

181 

182 This is everything that must be done before a request that doesn't 

183 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

184 philosophy. 

185 

186 .. note: 

187 

188 This method will be used only once, so ``headers`` will be 

189 mutated by having a new key added to it. 

190 

191 Args: 

192 data (bytes): The resource content to be uploaded. 

193 content_type (str): The content type for the request. 

194 

195 Returns: 

196 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

197 

198 * HTTP verb for the request (always POST) 

199 * the URL for the request 

200 * the body of the request 

201 * headers for the request 

202 

203 Raises: 

204 ValueError: If the current upload has already finished. 

205 TypeError: If ``data`` isn't bytes. 

206 

207 .. _sans-I/O: https://sans-io.readthedocs.io/ 

208 """ 

209 if self.finished: 

210 raise ValueError("An upload can only be used once.") 

211 

212 if not isinstance(data, bytes): 

213 raise TypeError("`data` must be bytes, received", type(data)) 

214 self._headers[_CONTENT_TYPE_HEADER] = content_type 

215 return _POST, self.upload_url, data, self._headers 

216 

217 def transmit(self, transport, data, content_type, timeout=None): 

218 """Transmit the resource to be uploaded. 

219 

220 Args: 

221 transport (object): An object which can make authenticated 

222 requests. 

223 data (bytes): The resource content to be uploaded. 

224 content_type (str): The content type of the resource, e.g. a JPEG 

225 image has content type ``image/jpeg``. 

226 timeout (Optional[Union[float, Tuple[float, float]]]): 

227 The number of seconds to wait for the server response. 

228 Depending on the retry strategy, a request may be repeated 

229 several times using the same timeout each time. 

230 

231 Can also be passed as a tuple (connect_timeout, read_timeout). 

232 See :meth:`requests.Session.request` documentation for details. 

233 

234 Raises: 

235 NotImplementedError: Always, since virtual. 

236 """ 

237 raise NotImplementedError("This implementation is virtual.") 

238 

239 

240class MultipartUpload(UploadBase): 

241 """Upload a resource with metadata to a Google API. 

242 

243 A **multipart** upload sends both metadata and the resource in a single 

244 (multipart) request. 

245 

246 Args: 

247 upload_url (str): The URL where the content will be uploaded. 

248 headers (Optional[Mapping[str, str]]): Extra headers that should 

249 be sent with the request, e.g. headers for encrypted data. 

250 checksum (Optional([str])): The type of checksum to compute to verify 

251 the integrity of the object. The request metadata will be amended 

252 to include the computed value. Using this option will override a 

253 manually-set checksum value. Supported values are "md5", "crc32c" 

254 and None. The default is None. 

255 

256 Attributes: 

257 upload_url (str): The URL where the content will be uploaded. 

258 """ 

259 

260 def __init__(self, upload_url, headers=None, checksum=None): 

261 super(MultipartUpload, self).__init__(upload_url, headers=headers) 

262 self._checksum_type = checksum 

263 

264 def _prepare_request(self, data, metadata, content_type): 

265 """Prepare the contents of an HTTP request. 

266 

267 This is everything that must be done before a request that doesn't 

268 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

269 philosophy. 

270 

271 .. note: 

272 

273 This method will be used only once, so ``headers`` will be 

274 mutated by having a new key added to it. 

275 

276 Args: 

277 data (bytes): The resource content to be uploaded. 

278 metadata (Mapping[str, str]): The resource metadata, such as an 

279 ACL list. 

280 content_type (str): The content type of the resource, e.g. a JPEG 

281 image has content type ``image/jpeg``. 

282 

283 Returns: 

284 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

285 

286 * HTTP verb for the request (always POST) 

287 * the URL for the request 

288 * the body of the request 

289 * headers for the request 

290 

291 Raises: 

292 ValueError: If the current upload has already finished. 

293 TypeError: If ``data`` isn't bytes. 

294 

295 .. _sans-I/O: https://sans-io.readthedocs.io/ 

296 """ 

297 if self.finished: 

298 raise ValueError("An upload can only be used once.") 

299 

300 if not isinstance(data, bytes): 

301 raise TypeError("`data` must be bytes, received", type(data)) 

302 

303 checksum_object = _helpers._get_checksum_object(self._checksum_type) 

304 if checksum_object is not None: 

305 checksum_object.update(data) 

306 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 

307 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

308 metadata[metadata_key] = actual_checksum 

309 

310 content, multipart_boundary = construct_multipart_request( 

311 data, metadata, content_type 

312 ) 

313 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"' 

314 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type 

315 

316 return _POST, self.upload_url, content, self._headers 

317 

318 def transmit(self, transport, data, metadata, content_type, timeout=None): 

319 """Transmit the resource to be uploaded. 

320 

321 Args: 

322 transport (object): An object which can make authenticated 

323 requests. 

324 data (bytes): The resource content to be uploaded. 

325 metadata (Mapping[str, str]): The resource metadata, such as an 

326 ACL list. 

327 content_type (str): The content type of the resource, e.g. a JPEG 

328 image has content type ``image/jpeg``. 

329 timeout (Optional[Union[float, Tuple[float, float]]]): 

330 The number of seconds to wait for the server response. 

331 Depending on the retry strategy, a request may be repeated 

332 several times using the same timeout each time. 

333 

334 Can also be passed as a tuple (connect_timeout, read_timeout). 

335 See :meth:`requests.Session.request` documentation for details. 

336 

337 Raises: 

338 NotImplementedError: Always, since virtual. 

339 """ 

340 raise NotImplementedError("This implementation is virtual.") 

341 

342 

343class ResumableUpload(UploadBase): 

344 """Initiate and fulfill a resumable upload to a Google API. 

345 

346 A **resumable** upload sends an initial request with the resource metadata 

347 and then gets assigned an upload ID / upload URL to send bytes to. 

348 Using the upload URL, the upload is then done in chunks (determined by 

349 the user) until all bytes have been uploaded. 

350 

351 Args: 

352 upload_url (str): The URL where the resumable upload will be initiated. 

353 chunk_size (int): The size of each chunk used to upload the resource. 

354 headers (Optional[Mapping[str, str]]): Extra headers that should 

355 be sent with every request. 

356 checksum (Optional([str])): The type of checksum to compute to verify 

357 the integrity of the object. After the upload is complete, the 

358 server-computed checksum of the resulting object will be read 

359 and google.resumable_media.common.DataCorruption will be raised on 

360 a mismatch. The corrupted file will not be deleted from the remote 

361 host automatically. Supported values are "md5", "crc32c" and None. 

362 The default is None. 

363 

364 Attributes: 

365 upload_url (str): The URL where the content will be uploaded. 

366 

367 Raises: 

368 ValueError: If ``chunk_size`` is not a multiple of 

369 :data:`.UPLOAD_CHUNK_SIZE`. 

370 """ 

371 

372 def __init__(self, upload_url, chunk_size, checksum=None, headers=None): 

373 super(ResumableUpload, self).__init__(upload_url, headers=headers) 

374 if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0: 

375 raise ValueError( 

376 "{} KB must divide chunk size".format( 

377 resumable_media.UPLOAD_CHUNK_SIZE / 1024 

378 ) 

379 ) 

380 self._chunk_size = chunk_size 

381 self._stream = None 

382 self._content_type = None 

383 self._bytes_uploaded = 0 

384 self._bytes_checksummed = 0 

385 self._checksum_type = checksum 

386 self._checksum_object = None 

387 self._total_bytes = None 

388 self._resumable_url = None 

389 self._invalid = False 

390 

391 @property 

392 def invalid(self): 

393 """bool: Indicates if the upload is in an invalid state. 

394 

395 This will occur if a call to :meth:`transmit_next_chunk` fails. 

396 To recover from such a failure, call :meth:`recover`. 

397 """ 

398 return self._invalid 

399 

400 @property 

401 def chunk_size(self): 

402 """int: The size of each chunk used to upload the resource.""" 

403 return self._chunk_size 

404 

405 @property 

406 def resumable_url(self): 

407 """Optional[str]: The URL of the in-progress resumable upload.""" 

408 return self._resumable_url 

409 

410 @property 

411 def bytes_uploaded(self): 

412 """int: Number of bytes that have been uploaded.""" 

413 return self._bytes_uploaded 

414 

415 @property 

416 def total_bytes(self): 

417 """Optional[int]: The total number of bytes to be uploaded. 

418 

419 If this upload is initiated (via :meth:`initiate`) with 

420 ``stream_final=True``, this value will be populated based on the size 

421 of the ``stream`` being uploaded. (By default ``stream_final=True``.) 

422 

423 If this upload is initiated with ``stream_final=False``, 

424 :attr:`total_bytes` will be :data:`None` since it cannot be 

425 determined from the stream. 

426 """ 

427 return self._total_bytes 

428 

429 def _prepare_initiate_request( 

430 self, stream, metadata, content_type, total_bytes=None, stream_final=True 

431 ): 

432 """Prepare the contents of HTTP request to initiate upload. 

433 

434 This is everything that must be done before a request that doesn't 

435 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

436 philosophy. 

437 

438 Args: 

439 stream (IO[bytes]): The stream (i.e. file-like object) that will 

440 be uploaded. The stream **must** be at the beginning (i.e. 

441 ``stream.tell() == 0``). 

442 metadata (Mapping[str, str]): The resource metadata, such as an 

443 ACL list. 

444 content_type (str): The content type of the resource, e.g. a JPEG 

445 image has content type ``image/jpeg``. 

446 total_bytes (Optional[int]): The total number of bytes to be 

447 uploaded. If specified, the upload size **will not** be 

448 determined from the stream (even if ``stream_final=True``). 

449 stream_final (Optional[bool]): Indicates if the ``stream`` is 

450 "final" (i.e. no more bytes will be added to it). In this case 

451 we determine the upload size from the size of the stream. If 

452 ``total_bytes`` is passed, this argument will be ignored. 

453 

454 Returns: 

455 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

456 

457 * HTTP verb for the request (always POST) 

458 * the URL for the request 

459 * the body of the request 

460 * headers for the request 

461 

462 Raises: 

463 ValueError: If the current upload has already been initiated. 

464 ValueError: If ``stream`` is not at the beginning. 

465 

466 .. _sans-I/O: https://sans-io.readthedocs.io/ 

467 """ 

468 if self.resumable_url is not None: 

469 raise ValueError("This upload has already been initiated.") 

470 if stream.tell() != 0: 

471 raise ValueError("Stream must be at beginning.") 

472 

473 self._stream = stream 

474 self._content_type = content_type 

475 

476 # Signed URL requires content type set directly - not through x-upload-content-type 

477 parse_result = urllib.parse.urlparse(self.upload_url) 

478 parsed_query = urllib.parse.parse_qs(parse_result.query) 

479 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query: 

480 # Deconstruct **self._headers first so that content type defined here takes priority 

481 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type} 

482 else: 

483 # Deconstruct **self._headers first so that content type defined here takes priority 

484 headers = { 

485 **self._headers, 

486 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8", 

487 "x-upload-content-type": content_type, 

488 } 

489 # Set the total bytes if possible. 

490 if total_bytes is not None: 

491 self._total_bytes = total_bytes 

492 elif stream_final: 

493 self._total_bytes = get_total_bytes(stream) 

494 # Add the total bytes to the headers if set. 

495 if self._total_bytes is not None: 

496 content_length = "{:d}".format(self._total_bytes) 

497 headers["x-upload-content-length"] = content_length 

498 

499 payload = json.dumps(metadata).encode("utf-8") 

500 return _POST, self.upload_url, payload, headers 

501 

502 def _process_initiate_response(self, response): 

503 """Process the response from an HTTP request that initiated upload. 

504 

505 This is everything that must be done after a request that doesn't 

506 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

507 philosophy. 

508 

509 This method takes the URL from the ``Location`` header and stores it 

510 for future use. Within that URL, we assume the ``upload_id`` query 

511 parameter has been included, but we do not check. 

512 

513 Args: 

514 response (object): The HTTP response object (need headers). 

515 

516 .. _sans-I/O: https://sans-io.readthedocs.io/ 

517 """ 

518 _helpers.require_status_code( 

519 response, 

520 (http.client.OK, http.client.CREATED), 

521 self._get_status_code, 

522 callback=self._make_invalid, 

523 ) 

524 self._resumable_url = _helpers.header_required( 

525 response, "location", self._get_headers 

526 ) 

527 

528 def initiate( 

529 self, 

530 transport, 

531 stream, 

532 metadata, 

533 content_type, 

534 total_bytes=None, 

535 stream_final=True, 

536 timeout=None, 

537 ): 

538 """Initiate a resumable upload. 

539 

540 By default, this method assumes your ``stream`` is in a "final" 

541 state ready to transmit. However, ``stream_final=False`` can be used 

542 to indicate that the size of the resource is not known. This can happen 

543 if bytes are being dynamically fed into ``stream``, e.g. if the stream 

544 is attached to application logs. 

545 

546 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 

547 read from the stream every time :meth:`transmit_next_chunk` is called. 

548 If one of those reads produces strictly fewer bites than the chunk 

549 size, the upload will be concluded. 

550 

551 Args: 

552 transport (object): An object which can make authenticated 

553 requests. 

554 stream (IO[bytes]): The stream (i.e. file-like object) that will 

555 be uploaded. The stream **must** be at the beginning (i.e. 

556 ``stream.tell() == 0``). 

557 metadata (Mapping[str, str]): The resource metadata, such as an 

558 ACL list. 

559 content_type (str): The content type of the resource, e.g. a JPEG 

560 image has content type ``image/jpeg``. 

561 total_bytes (Optional[int]): The total number of bytes to be 

562 uploaded. If specified, the upload size **will not** be 

563 determined from the stream (even if ``stream_final=True``). 

564 stream_final (Optional[bool]): Indicates if the ``stream`` is 

565 "final" (i.e. no more bytes will be added to it). In this case 

566 we determine the upload size from the size of the stream. If 

567 ``total_bytes`` is passed, this argument will be ignored. 

568 timeout (Optional[Union[float, Tuple[float, float]]]): 

569 The number of seconds to wait for the server response. 

570 Depending on the retry strategy, a request may be repeated 

571 several times using the same timeout each time. 

572 

573 Can also be passed as a tuple (connect_timeout, read_timeout). 

574 See :meth:`requests.Session.request` documentation for details. 

575 

576 Raises: 

577 NotImplementedError: Always, since virtual. 

578 """ 

579 raise NotImplementedError("This implementation is virtual.") 

580 

581 def _prepare_request(self): 

582 """Prepare the contents of HTTP request to upload a chunk. 

583 

584 This is everything that must be done before a request that doesn't 

585 require network I/O. This is based on the `sans-I/O`_ philosophy. 

586 

587 For the time being, this **does require** some form of I/O to read 

588 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this 

589 will (almost) certainly not be network I/O. 

590 

591 Returns: 

592 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

593 

594 * HTTP verb for the request (always PUT) 

595 * the URL for the request 

596 * the body of the request 

597 * headers for the request 

598 

599 The headers incorporate the ``_headers`` on the current instance. 

600 

601 Raises: 

602 ValueError: If the current upload has finished. 

603 ValueError: If the current upload is in an invalid state. 

604 ValueError: If the current upload has not been initiated. 

605 ValueError: If the location in the stream (i.e. ``stream.tell()``) 

606 does not agree with ``bytes_uploaded``. 

607 

608 .. _sans-I/O: https://sans-io.readthedocs.io/ 

609 """ 

610 if self.finished: 

611 raise ValueError("Upload has finished.") 

612 if self.invalid: 

613 raise ValueError( 

614 "Upload is in an invalid state. To recover call `recover()`." 

615 ) 

616 if self.resumable_url is None: 

617 raise ValueError( 

618 "This upload has not been initiated. Please call " 

619 "initiate() before beginning to transmit chunks." 

620 ) 

621 

622 start_byte, payload, content_range = get_next_chunk( 

623 self._stream, self._chunk_size, self._total_bytes 

624 ) 

625 if start_byte != self.bytes_uploaded: 

626 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded) 

627 raise ValueError(msg) 

628 

629 self._update_checksum(start_byte, payload) 

630 

631 headers = { 

632 **self._headers, 

633 _CONTENT_TYPE_HEADER: self._content_type, 

634 _helpers.CONTENT_RANGE_HEADER: content_range, 

635 } 

636 return _PUT, self.resumable_url, payload, headers 

637 

638 def _update_checksum(self, start_byte, payload): 

639 """Update the checksum with the payload if not already updated. 

640 

641 Because error recovery can result in bytes being transmitted more than 

642 once, the checksum tracks the number of bytes checked in 

643 self._bytes_checksummed and skips bytes that have already been summed. 

644 """ 

645 if not self._checksum_type: 

646 return 

647 

648 if not self._checksum_object: 

649 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

650 

651 if start_byte < self._bytes_checksummed: 

652 offset = self._bytes_checksummed - start_byte 

653 data = payload[offset:] 

654 else: 

655 data = payload 

656 

657 self._checksum_object.update(data) 

658 self._bytes_checksummed += len(data) 

659 

660 def _make_invalid(self): 

661 """Simple setter for ``invalid``. 

662 

663 This is intended to be passed along as a callback to helpers that 

664 raise an exception so they can mark this instance as invalid before 

665 raising. 

666 """ 

667 self._invalid = True 

668 

669 def _process_resumable_response(self, response, bytes_sent): 

670 """Process the response from an HTTP request. 

671 

672 This is everything that must be done after a request that doesn't 

673 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

674 philosophy. 

675 

676 Args: 

677 response (object): The HTTP response object. 

678 bytes_sent (int): The number of bytes sent in the request that 

679 ``response`` was returned for. 

680 

681 Raises: 

682 ~google.resumable_media.common.InvalidResponse: If the status 

683 code is 308 and the ``range`` header is not of the form 

684 ``bytes 0-{end}``. 

685 ~google.resumable_media.common.InvalidResponse: If the status 

686 code is not 200 or 308. 

687 

688 .. _sans-I/O: https://sans-io.readthedocs.io/ 

689 """ 

690 status_code = _helpers.require_status_code( 

691 response, 

692 (http.client.OK, http.client.PERMANENT_REDIRECT), 

693 self._get_status_code, 

694 callback=self._make_invalid, 

695 ) 

696 if status_code == http.client.OK: 

697 # NOTE: We use the "local" information of ``bytes_sent`` to update 

698 # ``bytes_uploaded``, but do not verify this against other 

699 # state. However, there may be some other information: 

700 # 

701 # * a ``size`` key in JSON response body 

702 # * the ``total_bytes`` attribute (if set) 

703 # * ``stream.tell()`` (relying on fact that ``initiate()`` 

704 # requires stream to be at the beginning) 

705 self._bytes_uploaded = self._bytes_uploaded + bytes_sent 

706 # Tombstone the current upload so it cannot be used again. 

707 self._finished = True 

708 # Validate the checksum. This can raise an exception on failure. 

709 self._validate_checksum(response) 

710 else: 

711 bytes_range = _helpers.header_required( 

712 response, 

713 _helpers.RANGE_HEADER, 

714 self._get_headers, 

715 callback=self._make_invalid, 

716 ) 

717 match = _BYTES_RANGE_RE.match(bytes_range) 

718 if match is None: 

719 self._make_invalid() 

720 raise common.InvalidResponse( 

721 response, 

722 'Unexpected "range" header', 

723 bytes_range, 

724 'Expected to be of the form "bytes=0-{end}"', 

725 ) 

726 self._bytes_uploaded = int(match.group("end_byte")) + 1 

727 

728 def _validate_checksum(self, response): 

729 """Check the computed checksum, if any, against the recieved metadata. 

730 

731 Args: 

732 response (object): The HTTP response object. 

733 

734 Raises: 

735 ~google.resumable_media.common.DataCorruption: If the checksum 

736 computed locally and the checksum reported by the remote host do 

737 not match. 

738 """ 

739 if self._checksum_type is None: 

740 return 

741 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

742 metadata = response.json() 

743 remote_checksum = metadata.get(metadata_key) 

744 if remote_checksum is None: 

745 raise common.InvalidResponse( 

746 response, 

747 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

748 self._get_headers(response), 

749 ) 

750 local_checksum = _helpers.prepare_checksum_digest( 

751 self._checksum_object.digest() 

752 ) 

753 if local_checksum != remote_checksum: 

754 raise common.DataCorruption( 

755 response, 

756 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

757 self._checksum_type.upper(), local_checksum, remote_checksum 

758 ), 

759 ) 

760 

761 def transmit_next_chunk(self, transport, timeout=None): 

762 """Transmit the next chunk of the resource to be uploaded. 

763 

764 If the current upload was initiated with ``stream_final=False``, 

765 this method will dynamically determine if the upload has completed. 

766 The upload will be considered complete if the stream produces 

767 fewer than :attr:`chunk_size` bytes when a chunk is read from it. 

768 

769 Args: 

770 transport (object): An object which can make authenticated 

771 requests. 

772 timeout (Optional[Union[float, Tuple[float, float]]]): 

773 The number of seconds to wait for the server response. 

774 Depending on the retry strategy, a request may be repeated 

775 several times using the same timeout each time. 

776 

777 Can also be passed as a tuple (connect_timeout, read_timeout). 

778 See :meth:`requests.Session.request` documentation for details. 

779 

780 Raises: 

781 NotImplementedError: Always, since virtual. 

782 """ 

783 raise NotImplementedError("This implementation is virtual.") 

784 

785 def _prepare_recover_request(self): 

786 """Prepare the contents of HTTP request to recover from failure. 

787 

788 This is everything that must be done before a request that doesn't 

789 require network I/O. This is based on the `sans-I/O`_ philosophy. 

790 

791 We assume that the :attr:`resumable_url` is set (i.e. the only way 

792 the upload can end up :attr:`invalid` is if it has been initiated. 

793 

794 Returns: 

795 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

796 

797 * HTTP verb for the request (always PUT) 

798 * the URL for the request 

799 * the body of the request (always :data:`None`) 

800 * headers for the request 

801 

802 The headers **do not** incorporate the ``_headers`` on the 

803 current instance. 

804 

805 .. _sans-I/O: https://sans-io.readthedocs.io/ 

806 """ 

807 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"} 

808 return _PUT, self.resumable_url, None, headers 

809 

810 def _process_recover_response(self, response): 

811 """Process the response from an HTTP request to recover from failure. 

812 

813 This is everything that must be done after a request that doesn't 

814 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

815 philosophy. 

816 

817 Args: 

818 response (object): The HTTP response object. 

819 

820 Raises: 

821 ~google.resumable_media.common.InvalidResponse: If the status 

822 code is not 308. 

823 ~google.resumable_media.common.InvalidResponse: If the status 

824 code is 308 and the ``range`` header is not of the form 

825 ``bytes 0-{end}``. 

826 

827 .. _sans-I/O: https://sans-io.readthedocs.io/ 

828 """ 

829 _helpers.require_status_code( 

830 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code 

831 ) 

832 headers = self._get_headers(response) 

833 if _helpers.RANGE_HEADER in headers: 

834 bytes_range = headers[_helpers.RANGE_HEADER] 

835 match = _BYTES_RANGE_RE.match(bytes_range) 

836 if match is None: 

837 raise common.InvalidResponse( 

838 response, 

839 'Unexpected "range" header', 

840 bytes_range, 

841 'Expected to be of the form "bytes=0-{end}"', 

842 ) 

843 self._bytes_uploaded = int(match.group("end_byte")) + 1 

844 else: 

845 # In this case, the upload has not "begun". 

846 self._bytes_uploaded = 0 

847 

848 self._stream.seek(self._bytes_uploaded) 

849 self._invalid = False 

850 

851 def recover(self, transport): 

852 """Recover from a failure. 

853 

854 This method should be used when a :class:`ResumableUpload` is in an 

855 :attr:`~ResumableUpload.invalid` state due to a request failure. 

856 

857 This will verify the progress with the server and make sure the 

858 current upload is in a valid state before :meth:`transmit_next_chunk` 

859 can be used again. 

860 

861 Args: 

862 transport (object): An object which can make authenticated 

863 requests. 

864 

865 Raises: 

866 NotImplementedError: Always, since virtual. 

867 """ 

868 raise NotImplementedError("This implementation is virtual.") 

869 

870 

871class XMLMPUContainer(UploadBase): 

872 """Initiate and close an upload using the XML MPU API. 

873 

874 An XML MPU sends an initial request and then receives an upload ID. 

875 Using the upload ID, the upload is then done in numbered parts and the 

876 parts can be uploaded concurrently. 

877 

878 In order to avoid concurrency issues with this container object, the 

879 uploading of individual parts is handled separately, by XMLMPUPart objects 

880 spawned from this container class. The XMLMPUPart objects are not 

881 necessarily in the same process as the container, so they do not update the 

882 container automatically. 

883 

884 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

885 given the JSON multipart upload, so the abbreviation "MPU" will be used 

886 throughout. 

887 

888 See: https://cloud.google.com/storage/docs/multipart-uploads 

889 

890 Args: 

891 upload_url (str): The URL of the object (without query parameters). The 

892 initiate, PUT, and finalization requests will all use this URL, with 

893 varying query parameters. 

894 filename (str): The name (path) of the file to upload. 

895 headers (Optional[Mapping[str, str]]): Extra headers that should 

896 be sent with every request. 

897 

898 Attributes: 

899 upload_url (str): The URL where the content will be uploaded. 

900 upload_id (Optional(str)): The ID of the upload from the initialization 

901 response. 

902 """ 

903 

904 def __init__(self, upload_url, filename, headers=None, upload_id=None): 

905 super().__init__(upload_url, headers=headers) 

906 self._filename = filename 

907 self._upload_id = upload_id 

908 self._parts = {} 

909 

910 @property 

911 def upload_id(self): 

912 return self._upload_id 

913 

914 def register_part(self, part_number, etag): 

915 """Register an uploaded part by part number and corresponding etag. 

916 

917 XMLMPUPart objects represent individual parts, and their part number 

918 and etag can be registered to the container object with this method 

919 and therefore incorporated in the finalize() call to finish the upload. 

920 

921 This method accepts part_number and etag, but not XMLMPUPart objects 

922 themselves, to reduce the complexity involved in running XMLMPUPart 

923 uploads in separate processes. 

924 

925 Args: 

926 part_number (int): The part number. Parts are assembled into the 

927 final uploaded object with finalize() in order of their part 

928 numbers. 

929 etag (str): The etag included in the server response after upload. 

930 """ 

931 self._parts[part_number] = etag 

932 

933 def _prepare_initiate_request(self, content_type): 

934 """Prepare the contents of HTTP request to initiate upload. 

935 

936 This is everything that must be done before a request that doesn't 

937 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

938 philosophy. 

939 

940 Args: 

941 content_type (str): The content type of the resource, e.g. a JPEG 

942 image has content type ``image/jpeg``. 

943 

944 Returns: 

945 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

946 

947 * HTTP verb for the request (always POST) 

948 * the URL for the request 

949 * the body of the request 

950 * headers for the request 

951 

952 Raises: 

953 ValueError: If the current upload has already been initiated. 

954 

955 .. _sans-I/O: https://sans-io.readthedocs.io/ 

956 """ 

957 if self.upload_id is not None: 

958 raise ValueError("This upload has already been initiated.") 

959 

960 initiate_url = self.upload_url + _MPU_INITIATE_QUERY 

961 

962 headers = { 

963 **self._headers, 

964 _CONTENT_TYPE_HEADER: content_type, 

965 } 

966 return _POST, initiate_url, None, headers 

967 

968 def _process_initiate_response(self, response): 

969 """Process the response from an HTTP request that initiated the upload. 

970 

971 This is everything that must be done after a request that doesn't 

972 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

973 philosophy. 

974 

975 This method takes the URL from the ``Location`` header and stores it 

976 for future use. Within that URL, we assume the ``upload_id`` query 

977 parameter has been included, but we do not check. 

978 

979 Args: 

980 response (object): The HTTP response object. 

981 

982 Raises: 

983 ~google.resumable_media.common.InvalidResponse: If the status 

984 code is not 200. 

985 

986 .. _sans-I/O: https://sans-io.readthedocs.io/ 

987 """ 

988 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

989 root = ElementTree.fromstring(response.text) 

990 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text 

991 

992 def initiate( 

993 self, 

994 transport, 

995 content_type, 

996 timeout=None, 

997 ): 

998 """Initiate an MPU and record the upload ID. 

999 

1000 Args: 

1001 transport (object): An object which can make authenticated 

1002 requests. 

1003 content_type (str): The content type of the resource, e.g. a JPEG 

1004 image has content type ``image/jpeg``. 

1005 timeout (Optional[Union[float, Tuple[float, float]]]): 

1006 The number of seconds to wait for the server response. 

1007 Depending on the retry strategy, a request may be repeated 

1008 several times using the same timeout each time. 

1009 

1010 Can also be passed as a tuple (connect_timeout, read_timeout). 

1011 See :meth:`requests.Session.request` documentation for details. 

1012 

1013 Raises: 

1014 NotImplementedError: Always, since virtual. 

1015 """ 

1016 raise NotImplementedError("This implementation is virtual.") 

1017 

1018 def _prepare_finalize_request(self): 

1019 """Prepare the contents of an HTTP request to finalize the upload. 

1020 

1021 All of the parts must be registered before calling this method. 

1022 

1023 Returns: 

1024 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1025 

1026 * HTTP verb for the request (always POST) 

1027 * the URL for the request 

1028 * the body of the request 

1029 * headers for the request 

1030 

1031 Raises: 

1032 ValueError: If the upload has not been initiated. 

1033 """ 

1034 if self.upload_id is None: 

1035 raise ValueError("This upload has not yet been initiated.") 

1036 

1037 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1038 finalize_url = self.upload_url + final_query 

1039 final_xml_root = ElementTree.Element("CompleteMultipartUpload") 

1040 for part_number, etag in self._parts.items(): 

1041 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop 

1042 ElementTree.SubElement(part, "PartNumber").text = str(part_number) 

1043 ElementTree.SubElement(part, "ETag").text = etag 

1044 payload = ElementTree.tostring(final_xml_root) 

1045 return _POST, finalize_url, payload, self._headers 

1046 

1047 def _process_finalize_response(self, response): 

1048 """Process the response from an HTTP request that finalized the upload. 

1049 

1050 This is everything that must be done after a request that doesn't 

1051 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1052 philosophy. 

1053 

1054 Args: 

1055 response (object): The HTTP response object. 

1056 

1057 Raises: 

1058 ~google.resumable_media.common.InvalidResponse: If the status 

1059 code is not 200. 

1060 

1061 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1062 """ 

1063 

1064 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

1065 self._finished = True 

1066 

1067 def finalize( 

1068 self, 

1069 transport, 

1070 timeout=None, 

1071 ): 

1072 """Finalize an MPU request with all the parts. 

1073 

1074 Args: 

1075 transport (object): An object which can make authenticated 

1076 requests. 

1077 timeout (Optional[Union[float, Tuple[float, float]]]): 

1078 The number of seconds to wait for the server response. 

1079 Depending on the retry strategy, a request may be repeated 

1080 several times using the same timeout each time. 

1081 

1082 Can also be passed as a tuple (connect_timeout, read_timeout). 

1083 See :meth:`requests.Session.request` documentation for details. 

1084 

1085 Raises: 

1086 NotImplementedError: Always, since virtual. 

1087 """ 

1088 raise NotImplementedError("This implementation is virtual.") 

1089 

1090 def _prepare_cancel_request(self): 

1091 """Prepare the contents of an HTTP request to cancel the upload. 

1092 

1093 Returns: 

1094 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1095 

1096 * HTTP verb for the request (always DELETE) 

1097 * the URL for the request 

1098 * the body of the request 

1099 * headers for the request 

1100 

1101 Raises: 

1102 ValueError: If the upload has not been initiated. 

1103 """ 

1104 if self.upload_id is None: 

1105 raise ValueError("This upload has not yet been initiated.") 

1106 

1107 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1108 cancel_url = self.upload_url + cancel_query 

1109 return _DELETE, cancel_url, None, self._headers 

1110 

1111 def _process_cancel_response(self, response): 

1112 """Process the response from an HTTP request that canceled the upload. 

1113 

1114 This is everything that must be done after a request that doesn't 

1115 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1116 philosophy. 

1117 

1118 Args: 

1119 response (object): The HTTP response object. 

1120 

1121 Raises: 

1122 ~google.resumable_media.common.InvalidResponse: If the status 

1123 code is not 204. 

1124 

1125 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1126 """ 

1127 

1128 _helpers.require_status_code( 

1129 response, (http.client.NO_CONTENT,), self._get_status_code 

1130 ) 

1131 

1132 def cancel( 

1133 self, 

1134 transport, 

1135 timeout=None, 

1136 ): 

1137 """Cancel an MPU request and permanently delete any uploaded parts. 

1138 

1139 This cannot be undone. 

1140 

1141 Args: 

1142 transport (object): An object which can make authenticated 

1143 requests. 

1144 timeout (Optional[Union[float, Tuple[float, float]]]): 

1145 The number of seconds to wait for the server response. 

1146 Depending on the retry strategy, a request may be repeated 

1147 several times using the same timeout each time. 

1148 

1149 Can also be passed as a tuple (connect_timeout, read_timeout). 

1150 See :meth:`requests.Session.request` documentation for details. 

1151 

1152 Raises: 

1153 NotImplementedError: Always, since virtual. 

1154 """ 

1155 raise NotImplementedError("This implementation is virtual.") 

1156 

1157 

1158class XMLMPUPart(UploadBase): 

1159 """Upload a single part of an existing XML MPU container. 

1160 

1161 An XML MPU sends an initial request and then receives an upload ID. 

1162 Using the upload ID, the upload is then done in numbered parts and the 

1163 parts can be uploaded concurrently. 

1164 

1165 In order to avoid concurrency issues with the container object, the 

1166 uploading of individual parts is handled separately by multiple objects 

1167 of this class. Once a part is uploaded, it can be registered with the 

1168 container with `container.register_part(part.part_number, part.etag)`. 

1169 

1170 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

1171 given the JSON multipart upload, so the abbreviation "MPU" will be used 

1172 throughout. 

1173 

1174 See: https://cloud.google.com/storage/docs/multipart-uploads 

1175 

1176 Args: 

1177 upload_url (str): The URL of the object (without query parameters). 

1178 upload_id (str): The ID of the upload from the initialization response. 

1179 filename (str): The name (path) of the file to upload. 

1180 start (int): The byte index of the beginning of the part. 

1181 end (int): The byte index of the end of the part. 

1182 part_number (int): The part number. Part numbers will be assembled in 

1183 sequential order when the container is finalized. 

1184 headers (Optional[Mapping[str, str]]): Extra headers that should 

1185 be sent with every request. 

1186 checksum (Optional([str])): The type of checksum to compute to verify 

1187 the integrity of the object. The request headers will be amended 

1188 to include the computed value. Supported values are "md5", "crc32c" 

1189 and None. The default is None. 

1190 

1191 Attributes: 

1192 upload_url (str): The URL of the object (without query parameters). 

1193 upload_id (str): The ID of the upload from the initialization response. 

1194 filename (str): The name (path) of the file to upload. 

1195 start (int): The byte index of the beginning of the part. 

1196 end (int): The byte index of the end of the part. 

1197 part_number (int): The part number. Part numbers will be assembled in 

1198 sequential order when the container is finalized. 

1199 etag (Optional(str)): The etag returned by the service after upload. 

1200 """ 

1201 

1202 def __init__( 

1203 self, 

1204 upload_url, 

1205 upload_id, 

1206 filename, 

1207 start, 

1208 end, 

1209 part_number, 

1210 headers=None, 

1211 checksum=None, 

1212 ): 

1213 super().__init__(upload_url, headers=headers) 

1214 self._filename = filename 

1215 self._start = start 

1216 self._end = end 

1217 self._upload_id = upload_id 

1218 self._part_number = part_number 

1219 self._etag = None 

1220 self._checksum_type = checksum 

1221 self._checksum_object = None 

1222 

1223 @property 

1224 def part_number(self): 

1225 return self._part_number 

1226 

1227 @property 

1228 def upload_id(self): 

1229 return self._upload_id 

1230 

1231 @property 

1232 def filename(self): 

1233 return self._filename 

1234 

1235 @property 

1236 def etag(self): 

1237 return self._etag 

1238 

1239 @property 

1240 def start(self): 

1241 return self._start 

1242 

1243 @property 

1244 def end(self): 

1245 return self._end 

1246 

1247 def _prepare_upload_request(self): 

1248 """Prepare the contents of HTTP request to upload a part. 

1249 

1250 This is everything that must be done before a request that doesn't 

1251 require network I/O. This is based on the `sans-I/O`_ philosophy. 

1252 

1253 For the time being, this **does require** some form of I/O to read 

1254 a part from ``stream`` (via :func:`get_part_payload`). However, this 

1255 will (almost) certainly not be network I/O. 

1256 

1257 Returns: 

1258 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1259 

1260 * HTTP verb for the request (always PUT) 

1261 * the URL for the request 

1262 * the body of the request 

1263 * headers for the request 

1264 

1265 The headers incorporate the ``_headers`` on the current instance. 

1266 

1267 Raises: 

1268 ValueError: If the current upload has finished. 

1269 

1270 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1271 """ 

1272 if self.finished: 

1273 raise ValueError("This part has already been uploaded.") 

1274 

1275 with open(self._filename, "br") as f: 

1276 f.seek(self._start) 

1277 payload = f.read(self._end - self._start) 

1278 

1279 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

1280 if self._checksum_object is not None: 

1281 self._checksum_object.update(payload) 

1282 

1283 part_query = _MPU_PART_QUERY_TEMPLATE.format( 

1284 part=self._part_number, upload_id=self._upload_id 

1285 ) 

1286 upload_url = self.upload_url + part_query 

1287 return _PUT, upload_url, payload, self._headers 

1288 

1289 def _process_upload_response(self, response): 

1290 """Process the response from an HTTP request. 

1291 

1292 This is everything that must be done after a request that doesn't 

1293 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1294 philosophy. 

1295 

1296 Args: 

1297 response (object): The HTTP response object. 

1298 

1299 Raises: 

1300 ~google.resumable_media.common.InvalidResponse: If the status 

1301 code is not 200 or the response is missing data. 

1302 

1303 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1304 """ 

1305 _helpers.require_status_code( 

1306 response, 

1307 (http.client.OK,), 

1308 self._get_status_code, 

1309 ) 

1310 

1311 self._validate_checksum(response) 

1312 

1313 etag = _helpers.header_required(response, "etag", self._get_headers) 

1314 self._etag = etag 

1315 self._finished = True 

1316 

1317 def upload( 

1318 self, 

1319 transport, 

1320 timeout=None, 

1321 ): 

1322 """Upload the part. 

1323 

1324 Args: 

1325 transport (object): An object which can make authenticated 

1326 requests. 

1327 timeout (Optional[Union[float, Tuple[float, float]]]): 

1328 The number of seconds to wait for the server response. 

1329 Depending on the retry strategy, a request may be repeated 

1330 several times using the same timeout each time. 

1331 

1332 Can also be passed as a tuple (connect_timeout, read_timeout). 

1333 See :meth:`requests.Session.request` documentation for details. 

1334 

1335 Raises: 

1336 NotImplementedError: Always, since virtual. 

1337 """ 

1338 raise NotImplementedError("This implementation is virtual.") 

1339 

1340 def _validate_checksum(self, response): 

1341 """Check the computed checksum, if any, against the response headers. 

1342 

1343 Args: 

1344 response (object): The HTTP response object. 

1345 

1346 Raises: 

1347 ~google.resumable_media.common.DataCorruption: If the checksum 

1348 computed locally and the checksum reported by the remote host do 

1349 not match. 

1350 """ 

1351 if self._checksum_type is None: 

1352 return 

1353 

1354 remote_checksum = _helpers._get_uploaded_checksum_from_headers( 

1355 response, self._get_headers, self._checksum_type 

1356 ) 

1357 

1358 if remote_checksum is None: 

1359 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

1360 raise common.InvalidResponse( 

1361 response, 

1362 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

1363 self._get_headers(response), 

1364 ) 

1365 local_checksum = _helpers.prepare_checksum_digest( 

1366 self._checksum_object.digest() 

1367 ) 

1368 if local_checksum != remote_checksum: 

1369 raise common.DataCorruption( 

1370 response, 

1371 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

1372 self._checksum_type.upper(), local_checksum, remote_checksum 

1373 ), 

1374 ) 

1375 

1376 

1377def get_boundary(): 

1378 """Get a random boundary for a multipart request. 

1379 

1380 Returns: 

1381 bytes: The boundary used to separate parts of a multipart request. 

1382 """ 

1383 random_int = random.randrange(sys.maxsize) 

1384 boundary = _BOUNDARY_FORMAT.format(random_int) 

1385 # NOTE: Neither % formatting nor .format() are available for byte strings 

1386 # in Python 3.4, so we must use unicode strings as templates. 

1387 return boundary.encode("utf-8") 

1388 

1389 

1390def construct_multipart_request(data, metadata, content_type): 

1391 """Construct a multipart request body. 

1392 

1393 Args: 

1394 data (bytes): The resource content (UTF-8 encoded as bytes) 

1395 to be uploaded. 

1396 metadata (Mapping[str, str]): The resource metadata, such as an 

1397 ACL list. 

1398 content_type (str): The content type of the resource, e.g. a JPEG 

1399 image has content type ``image/jpeg``. 

1400 

1401 Returns: 

1402 Tuple[bytes, bytes]: The multipart request body and the boundary used 

1403 between each part. 

1404 """ 

1405 multipart_boundary = get_boundary() 

1406 json_bytes = json.dumps(metadata).encode("utf-8") 

1407 content_type = content_type.encode("utf-8") 

1408 # Combine the two parts into a multipart payload. 

1409 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4. 

1410 boundary_sep = _MULTIPART_SEP + multipart_boundary 

1411 content = ( 

1412 boundary_sep 

1413 + _MULTIPART_BEGIN 

1414 + json_bytes 

1415 + _CRLF 

1416 + boundary_sep 

1417 + _CRLF 

1418 + b"content-type: " 

1419 + content_type 

1420 + _CRLF 

1421 + _CRLF 

1422 + data # Empty line between headers and body. 

1423 + _CRLF 

1424 + boundary_sep 

1425 + _MULTIPART_SEP 

1426 ) 

1427 

1428 return content, multipart_boundary 

1429 

1430 

1431def get_total_bytes(stream): 

1432 """Determine the total number of bytes in a stream. 

1433 

1434 Args: 

1435 stream (IO[bytes]): The stream (i.e. file-like object). 

1436 

1437 Returns: 

1438 int: The number of bytes. 

1439 """ 

1440 current_position = stream.tell() 

1441 # NOTE: ``.seek()`` **should** return the same value that ``.tell()`` 

1442 # returns, but in Python 2, ``file`` objects do not. 

1443 stream.seek(0, os.SEEK_END) 

1444 end_position = stream.tell() 

1445 # Go back to the initial position. 

1446 stream.seek(current_position) 

1447 

1448 return end_position 

1449 

1450 

1451def get_next_chunk(stream, chunk_size, total_bytes): 

1452 """Get a chunk from an I/O stream. 

1453 

1454 The ``stream`` may have fewer bytes remaining than ``chunk_size`` 

1455 so it may not always be the case that 

1456 ``end_byte == start_byte + chunk_size - 1``. 

1457 

1458 Args: 

1459 stream (IO[bytes]): The stream (i.e. file-like object). 

1460 chunk_size (int): The size of the chunk to be read from the ``stream``. 

1461 total_bytes (Optional[int]): The (expected) total number of bytes 

1462 in the ``stream``. 

1463 

1464 Returns: 

1465 Tuple[int, bytes, str]: Triple of: 

1466 

1467 * the start byte index 

1468 * the content in between the start and end bytes (inclusive) 

1469 * content range header for the chunk (slice) that has been read 

1470 

1471 Raises: 

1472 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields 

1473 non-empty content. 

1474 ValueError: If there is no data left to consume. This corresponds 

1475 exactly to the case ``end_byte < start_byte``, which can only 

1476 occur if ``end_byte == start_byte - 1``. 

1477 """ 

1478 start_byte = stream.tell() 

1479 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0: 

1480 payload = stream.read(total_bytes - start_byte) 

1481 else: 

1482 payload = stream.read(chunk_size) 

1483 end_byte = stream.tell() - 1 

1484 

1485 num_bytes_read = len(payload) 

1486 if total_bytes is None: 

1487 if num_bytes_read < chunk_size: 

1488 # We now **KNOW** the total number of bytes. 

1489 total_bytes = end_byte + 1 

1490 elif total_bytes == 0: 

1491 # NOTE: We also expect ``start_byte == 0`` here but don't check 

1492 # because ``_prepare_initiate_request()`` requires the 

1493 # stream to be at the beginning. 

1494 if num_bytes_read != 0: 

1495 raise ValueError( 

1496 "Stream specified as empty, but produced non-empty content." 

1497 ) 

1498 else: 

1499 if num_bytes_read == 0: 

1500 raise ValueError( 

1501 "Stream is already exhausted. There is no content remaining." 

1502 ) 

1503 

1504 content_range = get_content_range(start_byte, end_byte, total_bytes) 

1505 return start_byte, payload, content_range 

1506 

1507 

1508def get_content_range(start_byte, end_byte, total_bytes): 

1509 """Convert start, end and total into content range header. 

1510 

1511 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*". 

1512 If we are dealing with an empty range (i.e. ``end_byte < start_byte``) 

1513 then "bytes */{total}" is used. 

1514 

1515 This function **ASSUMES** that if the size is not known, the caller will 

1516 not also pass an empty range. 

1517 

1518 Args: 

1519 start_byte (int): The start (inclusive) of the byte range. 

1520 end_byte (int): The end (inclusive) of the byte range. 

1521 total_bytes (Optional[int]): The number of bytes in the byte 

1522 range (if known). 

1523 

1524 Returns: 

1525 str: The content range header. 

1526 """ 

1527 if total_bytes is None: 

1528 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte) 

1529 elif end_byte < start_byte: 

1530 return _EMPTY_RANGE_TEMPLATE.format(total_bytes) 

1531 else: 

1532 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)