Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/resumable_media/_upload.py: 30%

247 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2017 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Virtual bases classes for uploading media via Google APIs. 

16 

17Supported here are: 

18 

19* simple (media) uploads 

20* multipart uploads that contain both metadata and a small file as payload 

21* resumable uploads (with metadata as well) 

22""" 

23 

24import http.client 

25import json 

26import os 

27import random 

28import re 

29import sys 

30import urllib.parse 

31 

32from google import resumable_media 

33from google.resumable_media import _helpers 

34from google.resumable_media import common 

35 

36 

37_CONTENT_TYPE_HEADER = "content-type" 

38_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}" 

39_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*" 

40_EMPTY_RANGE_TEMPLATE = "bytes */{:d}" 

41_BOUNDARY_WIDTH = len(str(sys.maxsize - 1)) 

42_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH) 

43_MULTIPART_SEP = b"--" 

44_CRLF = b"\r\n" 

45_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n" 

46_RELATED_HEADER = b'multipart/related; boundary="' 

47_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE) 

48_STREAM_ERROR_TEMPLATE = ( 

49 "Bytes stream is in unexpected state. " 

50 "The local stream has had {:d} bytes read from it while " 

51 "{:d} bytes have already been updated (they should match)." 

52) 

53_STREAM_READ_PAST_TEMPLATE = ( 

54 "{:d} bytes have been read from the stream, which exceeds " 

55 "the expected total {:d}." 

56) 

57_POST = "POST" 

58_PUT = "PUT" 

59_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = ( 

60 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the " 

61 "remote host, ``{}``, did not match." 

62) 

63_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

64 "Response metadata had no ``{}`` value; checksum could not be validated." 

65) 

66 

67 

68class UploadBase(object): 

69 """Base class for upload helpers. 

70 

71 Defines core shared behavior across different upload types. 

72 

73 Args: 

74 upload_url (str): The URL where the content will be uploaded. 

75 headers (Optional[Mapping[str, str]]): Extra headers that should 

76 be sent with the request, e.g. headers for encrypted data. 

77 

78 Attributes: 

79 upload_url (str): The URL where the content will be uploaded. 

80 """ 

81 

82 def __init__(self, upload_url, headers=None): 

83 self.upload_url = upload_url 

84 if headers is None: 

85 headers = {} 

86 self._headers = headers 

87 self._finished = False 

88 self._retry_strategy = common.RetryStrategy() 

89 

90 @property 

91 def finished(self): 

92 """bool: Flag indicating if the upload has completed.""" 

93 return self._finished 

94 

95 def _process_response(self, response): 

96 """Process the response from an HTTP request. 

97 

98 This is everything that must be done after a request that doesn't 

99 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

100 philosophy. 

101 

102 Args: 

103 response (object): The HTTP response object. 

104 

105 Raises: 

106 ~google.resumable_media.common.InvalidResponse: If the status 

107 code is not 200. 

108 

109 .. _sans-I/O: https://sans-io.readthedocs.io/ 

110 """ 

111 # Tombstone the current upload so it cannot be used again (in either 

112 # failure or success). 

113 self._finished = True 

114 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

115 

116 @staticmethod 

117 def _get_status_code(response): 

118 """Access the status code from an HTTP response. 

119 

120 Args: 

121 response (object): The HTTP response object. 

122 

123 Raises: 

124 NotImplementedError: Always, since virtual. 

125 """ 

126 raise NotImplementedError("This implementation is virtual.") 

127 

128 @staticmethod 

129 def _get_headers(response): 

130 """Access the headers from an HTTP response. 

131 

132 Args: 

133 response (object): The HTTP response object. 

134 

135 Raises: 

136 NotImplementedError: Always, since virtual. 

137 """ 

138 raise NotImplementedError("This implementation is virtual.") 

139 

140 @staticmethod 

141 def _get_body(response): 

142 """Access the response body from an HTTP response. 

143 

144 Args: 

145 response (object): The HTTP response object. 

146 

147 Raises: 

148 NotImplementedError: Always, since virtual. 

149 """ 

150 raise NotImplementedError("This implementation is virtual.") 

151 

152 

153class SimpleUpload(UploadBase): 

154 """Upload a resource to a Google API. 

155 

156 A **simple** media upload sends no metadata and completes the upload 

157 in a single request. 

158 

159 Args: 

160 upload_url (str): The URL where the content will be uploaded. 

161 headers (Optional[Mapping[str, str]]): Extra headers that should 

162 be sent with the request, e.g. headers for encrypted data. 

163 

164 Attributes: 

165 upload_url (str): The URL where the content will be uploaded. 

166 """ 

167 

168 def _prepare_request(self, data, content_type): 

169 """Prepare the contents of an HTTP request. 

170 

171 This is everything that must be done before a request that doesn't 

172 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

173 philosophy. 

174 

175 .. note: 

176 

177 This method will be used only once, so ``headers`` will be 

178 mutated by having a new key added to it. 

179 

180 Args: 

181 data (bytes): The resource content to be uploaded. 

182 content_type (str): The content type for the request. 

183 

184 Returns: 

185 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

186 

187 * HTTP verb for the request (always POST) 

188 * the URL for the request 

189 * the body of the request 

190 * headers for the request 

191 

192 Raises: 

193 ValueError: If the current upload has already finished. 

194 TypeError: If ``data`` isn't bytes. 

195 

196 .. _sans-I/O: https://sans-io.readthedocs.io/ 

197 """ 

198 if self.finished: 

199 raise ValueError("An upload can only be used once.") 

200 

201 if not isinstance(data, bytes): 

202 raise TypeError("`data` must be bytes, received", type(data)) 

203 self._headers[_CONTENT_TYPE_HEADER] = content_type 

204 return _POST, self.upload_url, data, self._headers 

205 

206 def transmit(self, transport, data, content_type, timeout=None): 

207 """Transmit the resource to be uploaded. 

208 

209 Args: 

210 transport (object): An object which can make authenticated 

211 requests. 

212 data (bytes): The resource content to be uploaded. 

213 content_type (str): The content type of the resource, e.g. a JPEG 

214 image has content type ``image/jpeg``. 

215 timeout (Optional[Union[float, Tuple[float, float]]]): 

216 The number of seconds to wait for the server response. 

217 Depending on the retry strategy, a request may be repeated 

218 several times using the same timeout each time. 

219 

220 Can also be passed as a tuple (connect_timeout, read_timeout). 

221 See :meth:`requests.Session.request` documentation for details. 

222 

223 Raises: 

224 NotImplementedError: Always, since virtual. 

225 """ 

226 raise NotImplementedError("This implementation is virtual.") 

227 

228 

229class MultipartUpload(UploadBase): 

230 """Upload a resource with metadata to a Google API. 

231 

232 A **multipart** upload sends both metadata and the resource in a single 

233 (multipart) request. 

234 

235 Args: 

236 upload_url (str): The URL where the content will be uploaded. 

237 headers (Optional[Mapping[str, str]]): Extra headers that should 

238 be sent with the request, e.g. headers for encrypted data. 

239 checksum Optional([str]): The type of checksum to compute to verify 

240 the integrity of the object. The request metadata will be amended 

241 to include the computed value. Using this option will override a 

242 manually-set checksum value. Supported values are "md5", "crc32c" 

243 and None. The default is None. 

244 

245 Attributes: 

246 upload_url (str): The URL where the content will be uploaded. 

247 """ 

248 

249 def __init__(self, upload_url, headers=None, checksum=None): 

250 super(MultipartUpload, self).__init__(upload_url, headers=headers) 

251 self._checksum_type = checksum 

252 

253 def _prepare_request(self, data, metadata, content_type): 

254 """Prepare the contents of an HTTP request. 

255 

256 This is everything that must be done before a request that doesn't 

257 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

258 philosophy. 

259 

260 .. note: 

261 

262 This method will be used only once, so ``headers`` will be 

263 mutated by having a new key added to it. 

264 

265 Args: 

266 data (bytes): The resource content to be uploaded. 

267 metadata (Mapping[str, str]): The resource metadata, such as an 

268 ACL list. 

269 content_type (str): The content type of the resource, e.g. a JPEG 

270 image has content type ``image/jpeg``. 

271 

272 Returns: 

273 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

274 

275 * HTTP verb for the request (always POST) 

276 * the URL for the request 

277 * the body of the request 

278 * headers for the request 

279 

280 Raises: 

281 ValueError: If the current upload has already finished. 

282 TypeError: If ``data`` isn't bytes. 

283 

284 .. _sans-I/O: https://sans-io.readthedocs.io/ 

285 """ 

286 if self.finished: 

287 raise ValueError("An upload can only be used once.") 

288 

289 if not isinstance(data, bytes): 

290 raise TypeError("`data` must be bytes, received", type(data)) 

291 

292 checksum_object = _helpers._get_checksum_object(self._checksum_type) 

293 if checksum_object is not None: 

294 checksum_object.update(data) 

295 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 

296 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

297 metadata[metadata_key] = actual_checksum 

298 

299 content, multipart_boundary = construct_multipart_request( 

300 data, metadata, content_type 

301 ) 

302 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"' 

303 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type 

304 

305 return _POST, self.upload_url, content, self._headers 

306 

307 def transmit(self, transport, data, metadata, content_type, timeout=None): 

308 """Transmit the resource to be uploaded. 

309 

310 Args: 

311 transport (object): An object which can make authenticated 

312 requests. 

313 data (bytes): The resource content to be uploaded. 

314 metadata (Mapping[str, str]): The resource metadata, such as an 

315 ACL list. 

316 content_type (str): The content type of the resource, e.g. a JPEG 

317 image has content type ``image/jpeg``. 

318 timeout (Optional[Union[float, Tuple[float, float]]]): 

319 The number of seconds to wait for the server response. 

320 Depending on the retry strategy, a request may be repeated 

321 several times using the same timeout each time. 

322 

323 Can also be passed as a tuple (connect_timeout, read_timeout). 

324 See :meth:`requests.Session.request` documentation for details. 

325 

326 Raises: 

327 NotImplementedError: Always, since virtual. 

328 """ 

329 raise NotImplementedError("This implementation is virtual.") 

330 

331 

332class ResumableUpload(UploadBase): 

333 """Initiate and fulfill a resumable upload to a Google API. 

334 

335 A **resumable** upload sends an initial request with the resource metadata 

336 and then gets assigned an upload ID / upload URL to send bytes to. 

337 Using the upload URL, the upload is then done in chunks (determined by 

338 the user) until all bytes have been uploaded. 

339 

340 Args: 

341 upload_url (str): The URL where the resumable upload will be initiated. 

342 chunk_size (int): The size of each chunk used to upload the resource. 

343 headers (Optional[Mapping[str, str]]): Extra headers that should 

344 be sent with the :meth:`initiate` request, e.g. headers for 

345 encrypted data. These **will not** be sent with 

346 :meth:`transmit_next_chunk` or :meth:`recover` requests. 

347 checksum Optional([str]): The type of checksum to compute to verify 

348 the integrity of the object. After the upload is complete, the 

349 server-computed checksum of the resulting object will be read 

350 and google.resumable_media.common.DataCorruption will be raised on 

351 a mismatch. The corrupted file will not be deleted from the remote 

352 host automatically. Supported values are "md5", "crc32c" and None. 

353 The default is None. 

354 

355 Attributes: 

356 upload_url (str): The URL where the content will be uploaded. 

357 

358 Raises: 

359 ValueError: If ``chunk_size`` is not a multiple of 

360 :data:`.UPLOAD_CHUNK_SIZE`. 

361 """ 

362 

363 def __init__(self, upload_url, chunk_size, checksum=None, headers=None): 

364 super(ResumableUpload, self).__init__(upload_url, headers=headers) 

365 if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0: 

366 raise ValueError( 

367 "{} KB must divide chunk size".format( 

368 resumable_media.UPLOAD_CHUNK_SIZE / 1024 

369 ) 

370 ) 

371 self._chunk_size = chunk_size 

372 self._stream = None 

373 self._content_type = None 

374 self._bytes_uploaded = 0 

375 self._bytes_checksummed = 0 

376 self._checksum_type = checksum 

377 self._checksum_object = None 

378 self._total_bytes = None 

379 self._resumable_url = None 

380 self._invalid = False 

381 

382 @property 

383 def invalid(self): 

384 """bool: Indicates if the upload is in an invalid state. 

385 

386 This will occur if a call to :meth:`transmit_next_chunk` fails. 

387 To recover from such a failure, call :meth:`recover`. 

388 """ 

389 return self._invalid 

390 

391 @property 

392 def chunk_size(self): 

393 """int: The size of each chunk used to upload the resource.""" 

394 return self._chunk_size 

395 

396 @property 

397 def resumable_url(self): 

398 """Optional[str]: The URL of the in-progress resumable upload.""" 

399 return self._resumable_url 

400 

401 @property 

402 def bytes_uploaded(self): 

403 """int: Number of bytes that have been uploaded.""" 

404 return self._bytes_uploaded 

405 

406 @property 

407 def total_bytes(self): 

408 """Optional[int]: The total number of bytes to be uploaded. 

409 

410 If this upload is initiated (via :meth:`initiate`) with 

411 ``stream_final=True``, this value will be populated based on the size 

412 of the ``stream`` being uploaded. (By default ``stream_final=True``.) 

413 

414 If this upload is initiated with ``stream_final=False``, 

415 :attr:`total_bytes` will be :data:`None` since it cannot be 

416 determined from the stream. 

417 """ 

418 return self._total_bytes 

419 

420 def _prepare_initiate_request( 

421 self, stream, metadata, content_type, total_bytes=None, stream_final=True 

422 ): 

423 """Prepare the contents of HTTP request to initiate upload. 

424 

425 This is everything that must be done before a request that doesn't 

426 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

427 philosophy. 

428 

429 Args: 

430 stream (IO[bytes]): The stream (i.e. file-like object) that will 

431 be uploaded. The stream **must** be at the beginning (i.e. 

432 ``stream.tell() == 0``). 

433 metadata (Mapping[str, str]): The resource metadata, such as an 

434 ACL list. 

435 content_type (str): The content type of the resource, e.g. a JPEG 

436 image has content type ``image/jpeg``. 

437 total_bytes (Optional[int]): The total number of bytes to be 

438 uploaded. If specified, the upload size **will not** be 

439 determined from the stream (even if ``stream_final=True``). 

440 stream_final (Optional[bool]): Indicates if the ``stream`` is 

441 "final" (i.e. no more bytes will be added to it). In this case 

442 we determine the upload size from the size of the stream. If 

443 ``total_bytes`` is passed, this argument will be ignored. 

444 

445 Returns: 

446 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

447 

448 * HTTP verb for the request (always POST) 

449 * the URL for the request 

450 * the body of the request 

451 * headers for the request 

452 

453 Raises: 

454 ValueError: If the current upload has already been initiated. 

455 ValueError: If ``stream`` is not at the beginning. 

456 

457 .. _sans-I/O: https://sans-io.readthedocs.io/ 

458 """ 

459 if self.resumable_url is not None: 

460 raise ValueError("This upload has already been initiated.") 

461 if stream.tell() != 0: 

462 raise ValueError("Stream must be at beginning.") 

463 

464 self._stream = stream 

465 self._content_type = content_type 

466 

467 # Signed URL requires content type set directly - not through x-upload-content-type 

468 parse_result = urllib.parse.urlparse(self.upload_url) 

469 parsed_query = urllib.parse.parse_qs(parse_result.query) 

470 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query: 

471 # Deconstruct **self._headers first so that content type defined here takes priority 

472 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type} 

473 else: 

474 # Deconstruct **self._headers first so that content type defined here takes priority 

475 headers = { 

476 **self._headers, 

477 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8", 

478 "x-upload-content-type": content_type, 

479 } 

480 # Set the total bytes if possible. 

481 if total_bytes is not None: 

482 self._total_bytes = total_bytes 

483 elif stream_final: 

484 self._total_bytes = get_total_bytes(stream) 

485 # Add the total bytes to the headers if set. 

486 if self._total_bytes is not None: 

487 content_length = "{:d}".format(self._total_bytes) 

488 headers["x-upload-content-length"] = content_length 

489 

490 payload = json.dumps(metadata).encode("utf-8") 

491 return _POST, self.upload_url, payload, headers 

492 

493 def _process_initiate_response(self, response): 

494 """Process the response from an HTTP request that initiated upload. 

495 

496 This is everything that must be done after a request that doesn't 

497 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

498 philosophy. 

499 

500 This method takes the URL from the ``Location`` header and stores it 

501 for future use. Within that URL, we assume the ``upload_id`` query 

502 parameter has been included, but we do not check. 

503 

504 Args: 

505 response (object): The HTTP response object (need headers). 

506 

507 .. _sans-I/O: https://sans-io.readthedocs.io/ 

508 """ 

509 _helpers.require_status_code( 

510 response, 

511 (http.client.OK, http.client.CREATED), 

512 self._get_status_code, 

513 callback=self._make_invalid, 

514 ) 

515 self._resumable_url = _helpers.header_required( 

516 response, "location", self._get_headers 

517 ) 

518 

519 def initiate( 

520 self, 

521 transport, 

522 stream, 

523 metadata, 

524 content_type, 

525 total_bytes=None, 

526 stream_final=True, 

527 timeout=None, 

528 ): 

529 """Initiate a resumable upload. 

530 

531 By default, this method assumes your ``stream`` is in a "final" 

532 state ready to transmit. However, ``stream_final=False`` can be used 

533 to indicate that the size of the resource is not known. This can happen 

534 if bytes are being dynamically fed into ``stream``, e.g. if the stream 

535 is attached to application logs. 

536 

537 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 

538 read from the stream every time :meth:`transmit_next_chunk` is called. 

539 If one of those reads produces strictly fewer bites than the chunk 

540 size, the upload will be concluded. 

541 

542 Args: 

543 transport (object): An object which can make authenticated 

544 requests. 

545 stream (IO[bytes]): The stream (i.e. file-like object) that will 

546 be uploaded. The stream **must** be at the beginning (i.e. 

547 ``stream.tell() == 0``). 

548 metadata (Mapping[str, str]): The resource metadata, such as an 

549 ACL list. 

550 content_type (str): The content type of the resource, e.g. a JPEG 

551 image has content type ``image/jpeg``. 

552 total_bytes (Optional[int]): The total number of bytes to be 

553 uploaded. If specified, the upload size **will not** be 

554 determined from the stream (even if ``stream_final=True``). 

555 stream_final (Optional[bool]): Indicates if the ``stream`` is 

556 "final" (i.e. no more bytes will be added to it). In this case 

557 we determine the upload size from the size of the stream. If 

558 ``total_bytes`` is passed, this argument will be ignored. 

559 timeout (Optional[Union[float, Tuple[float, float]]]): 

560 The number of seconds to wait for the server response. 

561 Depending on the retry strategy, a request may be repeated 

562 several times using the same timeout each time. 

563 

564 Can also be passed as a tuple (connect_timeout, read_timeout). 

565 See :meth:`requests.Session.request` documentation for details. 

566 

567 Raises: 

568 NotImplementedError: Always, since virtual. 

569 """ 

570 raise NotImplementedError("This implementation is virtual.") 

571 

572 def _prepare_request(self): 

573 """Prepare the contents of HTTP request to upload a chunk. 

574 

575 This is everything that must be done before a request that doesn't 

576 require network I/O. This is based on the `sans-I/O`_ philosophy. 

577 

578 For the time being, this **does require** some form of I/O to read 

579 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this 

580 will (almost) certainly not be network I/O. 

581 

582 Returns: 

583 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

584 

585 * HTTP verb for the request (always PUT) 

586 * the URL for the request 

587 * the body of the request 

588 * headers for the request 

589 

590 The headers **do not** incorporate the ``_headers`` on the 

591 current instance. 

592 

593 Raises: 

594 ValueError: If the current upload has finished. 

595 ValueError: If the current upload is in an invalid state. 

596 ValueError: If the current upload has not been initiated. 

597 ValueError: If the location in the stream (i.e. ``stream.tell()``) 

598 does not agree with ``bytes_uploaded``. 

599 

600 .. _sans-I/O: https://sans-io.readthedocs.io/ 

601 """ 

602 if self.finished: 

603 raise ValueError("Upload has finished.") 

604 if self.invalid: 

605 raise ValueError( 

606 "Upload is in an invalid state. To recover call `recover()`." 

607 ) 

608 if self.resumable_url is None: 

609 raise ValueError( 

610 "This upload has not been initiated. Please call " 

611 "initiate() before beginning to transmit chunks." 

612 ) 

613 

614 start_byte, payload, content_range = get_next_chunk( 

615 self._stream, self._chunk_size, self._total_bytes 

616 ) 

617 if start_byte != self.bytes_uploaded: 

618 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded) 

619 raise ValueError(msg) 

620 

621 self._update_checksum(start_byte, payload) 

622 

623 headers = { 

624 **self._headers, 

625 _CONTENT_TYPE_HEADER: self._content_type, 

626 _helpers.CONTENT_RANGE_HEADER: content_range, 

627 } 

628 return _PUT, self.resumable_url, payload, headers 

629 

630 def _update_checksum(self, start_byte, payload): 

631 """Update the checksum with the payload if not already updated. 

632 

633 Because error recovery can result in bytes being transmitted more than 

634 once, the checksum tracks the number of bytes checked in 

635 self._bytes_checksummed and skips bytes that have already been summed. 

636 """ 

637 if not self._checksum_type: 

638 return 

639 

640 if not self._checksum_object: 

641 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

642 

643 if start_byte < self._bytes_checksummed: 

644 offset = self._bytes_checksummed - start_byte 

645 data = payload[offset:] 

646 else: 

647 data = payload 

648 

649 self._checksum_object.update(data) 

650 self._bytes_checksummed += len(data) 

651 

652 def _make_invalid(self): 

653 """Simple setter for ``invalid``. 

654 

655 This is intended to be passed along as a callback to helpers that 

656 raise an exception so they can mark this instance as invalid before 

657 raising. 

658 """ 

659 self._invalid = True 

660 

661 def _process_resumable_response(self, response, bytes_sent): 

662 """Process the response from an HTTP request. 

663 

664 This is everything that must be done after a request that doesn't 

665 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

666 philosophy. 

667 

668 Args: 

669 response (object): The HTTP response object. 

670 bytes_sent (int): The number of bytes sent in the request that 

671 ``response`` was returned for. 

672 

673 Raises: 

674 ~google.resumable_media.common.InvalidResponse: If the status 

675 code is 308 and the ``range`` header is not of the form 

676 ``bytes 0-{end}``. 

677 ~google.resumable_media.common.InvalidResponse: If the status 

678 code is not 200 or 308. 

679 

680 .. _sans-I/O: https://sans-io.readthedocs.io/ 

681 """ 

682 status_code = _helpers.require_status_code( 

683 response, 

684 (http.client.OK, http.client.PERMANENT_REDIRECT), 

685 self._get_status_code, 

686 callback=self._make_invalid, 

687 ) 

688 if status_code == http.client.OK: 

689 # NOTE: We use the "local" information of ``bytes_sent`` to update 

690 # ``bytes_uploaded``, but do not verify this against other 

691 # state. However, there may be some other information: 

692 # 

693 # * a ``size`` key in JSON response body 

694 # * the ``total_bytes`` attribute (if set) 

695 # * ``stream.tell()`` (relying on fact that ``initiate()`` 

696 # requires stream to be at the beginning) 

697 self._bytes_uploaded = self._bytes_uploaded + bytes_sent 

698 # Tombstone the current upload so it cannot be used again. 

699 self._finished = True 

700 # Validate the checksum. This can raise an exception on failure. 

701 self._validate_checksum(response) 

702 else: 

703 bytes_range = _helpers.header_required( 

704 response, 

705 _helpers.RANGE_HEADER, 

706 self._get_headers, 

707 callback=self._make_invalid, 

708 ) 

709 match = _BYTES_RANGE_RE.match(bytes_range) 

710 if match is None: 

711 self._make_invalid() 

712 raise common.InvalidResponse( 

713 response, 

714 'Unexpected "range" header', 

715 bytes_range, 

716 'Expected to be of the form "bytes=0-{end}"', 

717 ) 

718 self._bytes_uploaded = int(match.group("end_byte")) + 1 

719 

720 def _validate_checksum(self, response): 

721 """Check the computed checksum, if any, against the response headers. 

722 

723 Args: 

724 response (object): The HTTP response object. 

725 

726 Raises: 

727 ~google.resumable_media.common.DataCorruption: If the checksum 

728 computed locally and the checksum reported by the remote host do 

729 not match. 

730 """ 

731 if self._checksum_type is None: 

732 return 

733 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

734 metadata = response.json() 

735 remote_checksum = metadata.get(metadata_key) 

736 if remote_checksum is None: 

737 raise common.InvalidResponse( 

738 response, 

739 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

740 self._get_headers(response), 

741 ) 

742 local_checksum = _helpers.prepare_checksum_digest( 

743 self._checksum_object.digest() 

744 ) 

745 if local_checksum != remote_checksum: 

746 raise common.DataCorruption( 

747 response, 

748 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

749 self._checksum_type.upper(), local_checksum, remote_checksum 

750 ), 

751 ) 

752 

753 def transmit_next_chunk(self, transport, timeout=None): 

754 """Transmit the next chunk of the resource to be uploaded. 

755 

756 If the current upload was initiated with ``stream_final=False``, 

757 this method will dynamically determine if the upload has completed. 

758 The upload will be considered complete if the stream produces 

759 fewer than :attr:`chunk_size` bytes when a chunk is read from it. 

760 

761 Args: 

762 transport (object): An object which can make authenticated 

763 requests. 

764 timeout (Optional[Union[float, Tuple[float, float]]]): 

765 The number of seconds to wait for the server response. 

766 Depending on the retry strategy, a request may be repeated 

767 several times using the same timeout each time. 

768 

769 Can also be passed as a tuple (connect_timeout, read_timeout). 

770 See :meth:`requests.Session.request` documentation for details. 

771 

772 Raises: 

773 NotImplementedError: Always, since virtual. 

774 """ 

775 raise NotImplementedError("This implementation is virtual.") 

776 

777 def _prepare_recover_request(self): 

778 """Prepare the contents of HTTP request to recover from failure. 

779 

780 This is everything that must be done before a request that doesn't 

781 require network I/O. This is based on the `sans-I/O`_ philosophy. 

782 

783 We assume that the :attr:`resumable_url` is set (i.e. the only way 

784 the upload can end up :attr:`invalid` is if it has been initiated. 

785 

786 Returns: 

787 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

788 

789 * HTTP verb for the request (always PUT) 

790 * the URL for the request 

791 * the body of the request (always :data:`None`) 

792 * headers for the request 

793 

794 The headers **do not** incorporate the ``_headers`` on the 

795 current instance. 

796 

797 .. _sans-I/O: https://sans-io.readthedocs.io/ 

798 """ 

799 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"} 

800 return _PUT, self.resumable_url, None, headers 

801 

802 def _process_recover_response(self, response): 

803 """Process the response from an HTTP request to recover from failure. 

804 

805 This is everything that must be done after a request that doesn't 

806 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

807 philosophy. 

808 

809 Args: 

810 response (object): The HTTP response object. 

811 

812 Raises: 

813 ~google.resumable_media.common.InvalidResponse: If the status 

814 code is not 308. 

815 ~google.resumable_media.common.InvalidResponse: If the status 

816 code is 308 and the ``range`` header is not of the form 

817 ``bytes 0-{end}``. 

818 

819 .. _sans-I/O: https://sans-io.readthedocs.io/ 

820 """ 

821 _helpers.require_status_code( 

822 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code 

823 ) 

824 headers = self._get_headers(response) 

825 if _helpers.RANGE_HEADER in headers: 

826 bytes_range = headers[_helpers.RANGE_HEADER] 

827 match = _BYTES_RANGE_RE.match(bytes_range) 

828 if match is None: 

829 raise common.InvalidResponse( 

830 response, 

831 'Unexpected "range" header', 

832 bytes_range, 

833 'Expected to be of the form "bytes=0-{end}"', 

834 ) 

835 self._bytes_uploaded = int(match.group("end_byte")) + 1 

836 else: 

837 # In this case, the upload has not "begun". 

838 self._bytes_uploaded = 0 

839 

840 self._stream.seek(self._bytes_uploaded) 

841 self._invalid = False 

842 

843 def recover(self, transport): 

844 """Recover from a failure. 

845 

846 This method should be used when a :class:`ResumableUpload` is in an 

847 :attr:`~ResumableUpload.invalid` state due to a request failure. 

848 

849 This will verify the progress with the server and make sure the 

850 current upload is in a valid state before :meth:`transmit_next_chunk` 

851 can be used again. 

852 

853 Args: 

854 transport (object): An object which can make authenticated 

855 requests. 

856 

857 Raises: 

858 NotImplementedError: Always, since virtual. 

859 """ 

860 raise NotImplementedError("This implementation is virtual.") 

861 

862 

863def get_boundary(): 

864 """Get a random boundary for a multipart request. 

865 

866 Returns: 

867 bytes: The boundary used to separate parts of a multipart request. 

868 """ 

869 random_int = random.randrange(sys.maxsize) 

870 boundary = _BOUNDARY_FORMAT.format(random_int) 

871 # NOTE: Neither % formatting nor .format() are available for byte strings 

872 # in Python 3.4, so we must use unicode strings as templates. 

873 return boundary.encode("utf-8") 

874 

875 

876def construct_multipart_request(data, metadata, content_type): 

877 """Construct a multipart request body. 

878 

879 Args: 

880 data (bytes): The resource content (UTF-8 encoded as bytes) 

881 to be uploaded. 

882 metadata (Mapping[str, str]): The resource metadata, such as an 

883 ACL list. 

884 content_type (str): The content type of the resource, e.g. a JPEG 

885 image has content type ``image/jpeg``. 

886 

887 Returns: 

888 Tuple[bytes, bytes]: The multipart request body and the boundary used 

889 between each part. 

890 """ 

891 multipart_boundary = get_boundary() 

892 json_bytes = json.dumps(metadata).encode("utf-8") 

893 content_type = content_type.encode("utf-8") 

894 # Combine the two parts into a multipart payload. 

895 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4. 

896 boundary_sep = _MULTIPART_SEP + multipart_boundary 

897 content = ( 

898 boundary_sep 

899 + _MULTIPART_BEGIN 

900 + json_bytes 

901 + _CRLF 

902 + boundary_sep 

903 + _CRLF 

904 + b"content-type: " 

905 + content_type 

906 + _CRLF 

907 + _CRLF 

908 + data # Empty line between headers and body. 

909 + _CRLF 

910 + boundary_sep 

911 + _MULTIPART_SEP 

912 ) 

913 

914 return content, multipart_boundary 

915 

916 

917def get_total_bytes(stream): 

918 """Determine the total number of bytes in a stream. 

919 

920 Args: 

921 stream (IO[bytes]): The stream (i.e. file-like object). 

922 

923 Returns: 

924 int: The number of bytes. 

925 """ 

926 current_position = stream.tell() 

927 # NOTE: ``.seek()`` **should** return the same value that ``.tell()`` 

928 # returns, but in Python 2, ``file`` objects do not. 

929 stream.seek(0, os.SEEK_END) 

930 end_position = stream.tell() 

931 # Go back to the initial position. 

932 stream.seek(current_position) 

933 

934 return end_position 

935 

936 

937def get_next_chunk(stream, chunk_size, total_bytes): 

938 """Get a chunk from an I/O stream. 

939 

940 The ``stream`` may have fewer bytes remaining than ``chunk_size`` 

941 so it may not always be the case that 

942 ``end_byte == start_byte + chunk_size - 1``. 

943 

944 Args: 

945 stream (IO[bytes]): The stream (i.e. file-like object). 

946 chunk_size (int): The size of the chunk to be read from the ``stream``. 

947 total_bytes (Optional[int]): The (expected) total number of bytes 

948 in the ``stream``. 

949 

950 Returns: 

951 Tuple[int, bytes, str]: Triple of: 

952 

953 * the start byte index 

954 * the content in between the start and end bytes (inclusive) 

955 * content range header for the chunk (slice) that has been read 

956 

957 Raises: 

958 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields 

959 non-empty content. 

960 ValueError: If there is no data left to consume. This corresponds 

961 exactly to the case ``end_byte < start_byte``, which can only 

962 occur if ``end_byte == start_byte - 1``. 

963 """ 

964 start_byte = stream.tell() 

965 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0: 

966 payload = stream.read(total_bytes - start_byte) 

967 else: 

968 payload = stream.read(chunk_size) 

969 end_byte = stream.tell() - 1 

970 

971 num_bytes_read = len(payload) 

972 if total_bytes is None: 

973 if num_bytes_read < chunk_size: 

974 # We now **KNOW** the total number of bytes. 

975 total_bytes = end_byte + 1 

976 elif total_bytes == 0: 

977 # NOTE: We also expect ``start_byte == 0`` here but don't check 

978 # because ``_prepare_initiate_request()`` requires the 

979 # stream to be at the beginning. 

980 if num_bytes_read != 0: 

981 raise ValueError( 

982 "Stream specified as empty, but produced non-empty content." 

983 ) 

984 else: 

985 if num_bytes_read == 0: 

986 raise ValueError( 

987 "Stream is already exhausted. There is no content remaining." 

988 ) 

989 

990 content_range = get_content_range(start_byte, end_byte, total_bytes) 

991 return start_byte, payload, content_range 

992 

993 

994def get_content_range(start_byte, end_byte, total_bytes): 

995 """Convert start, end and total into content range header. 

996 

997 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*". 

998 If we are dealing with an empty range (i.e. ``end_byte < start_byte``) 

999 then "bytes */{total}" is used. 

1000 

1001 This function **ASSUMES** that if the size is not known, the caller will 

1002 not also pass an empty range. 

1003 

1004 Args: 

1005 start_byte (int): The start (inclusive) of the byte range. 

1006 end_byte (int): The end (inclusive) of the byte range. 

1007 total_bytes (Optional[int]): The number of bytes in the byte 

1008 range (if known). 

1009 

1010 Returns: 

1011 str: The content range header. 

1012 """ 

1013 if total_bytes is None: 

1014 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte) 

1015 elif end_byte < start_byte: 

1016 return _EMPTY_RANGE_TEMPLATE.format(total_bytes) 

1017 else: 

1018 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)