Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/_media/_upload.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

383 statements  

1# Copyright 2017 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Virtual bases classes for uploading media via Google APIs. 

16 

17Supported here are: 

18 

19* simple (media) uploads 

20* multipart uploads that contain both metadata and a small file as payload 

21* resumable uploads (with metadata as well) 

22""" 

23 

24import http.client 

25import json 

26import os 

27import random 

28import re 

29import sys 

30import urllib.parse 

31 

32from google.cloud.storage._media import _helpers 

33from google.cloud.storage._media import UPLOAD_CHUNK_SIZE 

34from google.cloud.storage.exceptions import InvalidResponse 

35from google.cloud.storage.exceptions import DataCorruption 

36from google.cloud.storage.retry import DEFAULT_RETRY 

37 

38from xml.etree import ElementTree 

39 

40 

41_CONTENT_TYPE_HEADER = "content-type" 

42_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}" 

43_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*" 

44_EMPTY_RANGE_TEMPLATE = "bytes */{:d}" 

45_BOUNDARY_WIDTH = len(str(sys.maxsize - 1)) 

46_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH) 

47_MULTIPART_SEP = b"--" 

48_CRLF = b"\r\n" 

49_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n" 

50_RELATED_HEADER = b'multipart/related; boundary="' 

51_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE) 

52_STREAM_ERROR_TEMPLATE = ( 

53 "Bytes stream is in unexpected state. " 

54 "The local stream has had {:d} bytes read from it while " 

55 "{:d} bytes have already been updated (they should match)." 

56) 

57_STREAM_READ_PAST_TEMPLATE = ( 

58 "{:d} bytes have been read from the stream, which exceeds " 

59 "the expected total {:d}." 

60) 

61_DELETE = "DELETE" 

62_POST = "POST" 

63_PUT = "PUT" 

64_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = ( 

65 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the " 

66 "remote host, ``{}``, did not match." 

67) 

68_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

69 "Response metadata had no ``{}`` value; checksum could not be validated." 

70) 

71_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

72 "Response headers had no ``{}`` value; checksum could not be validated." 

73) 

74_MPU_INITIATE_QUERY = "?uploads" 

75_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" 

76_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}" 

77_UPLOAD_ID_NODE = "UploadId" 

78_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" 

79 

80 

81class UploadBase(object): 

82 """Base class for upload helpers. 

83 

84 Defines core shared behavior across different upload types. 

85 

86 Args: 

87 upload_url (str): The URL where the content will be uploaded. 

88 headers (Optional[Mapping[str, str]]): Extra headers that should 

89 be sent with the request, e.g. headers for encrypted data. 

90 retry (Optional[google.api_core.retry.Retry]): How to retry the 

91 RPC. A None value will disable retries. A 

92 google.api_core.retry.Retry value will enable retries, and the 

93 object will configure backoff and timeout options. 

94 

95 See the retry.py source code and docstrings in this package 

96 (google.cloud.storage.retry) for information on retry types and how 

97 to configure them. 

98 

99 Attributes: 

100 upload_url (str): The URL where the content will be uploaded. 

101 """ 

102 

103 def __init__(self, upload_url, headers=None, retry=DEFAULT_RETRY): 

104 self.upload_url = upload_url 

105 if headers is None: 

106 headers = {} 

107 self._headers = headers 

108 self._finished = False 

109 self._retry_strategy = retry 

110 

111 @property 

112 def finished(self): 

113 """bool: Flag indicating if the upload has completed.""" 

114 return self._finished 

115 

116 def _process_response(self, response): 

117 """Process the response from an HTTP request. 

118 

119 This is everything that must be done after a request that doesn't 

120 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

121 philosophy. 

122 

123 Args: 

124 response (object): The HTTP response object. 

125 

126 Raises: 

127 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

128 code is not 200. 

129 

130 .. _sans-I/O: https://sans-io.readthedocs.io/ 

131 """ 

132 # Tombstone the current upload so it cannot be used again (in either 

133 # failure or success). 

134 self._finished = True 

135 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

136 

137 @staticmethod 

138 def _get_status_code(response): 

139 """Access the status code from an HTTP response. 

140 

141 Args: 

142 response (object): The HTTP response object. 

143 

144 Raises: 

145 NotImplementedError: Always, since virtual. 

146 """ 

147 raise NotImplementedError("This implementation is virtual.") 

148 

149 @staticmethod 

150 def _get_headers(response): 

151 """Access the headers from an HTTP response. 

152 

153 Args: 

154 response (object): The HTTP response object. 

155 

156 Raises: 

157 NotImplementedError: Always, since virtual. 

158 """ 

159 raise NotImplementedError("This implementation is virtual.") 

160 

161 @staticmethod 

162 def _get_body(response): 

163 """Access the response body from an HTTP response. 

164 

165 Args: 

166 response (object): The HTTP response object. 

167 

168 Raises: 

169 NotImplementedError: Always, since virtual. 

170 """ 

171 raise NotImplementedError("This implementation is virtual.") 

172 

173 

174class SimpleUpload(UploadBase): 

175 """Upload a resource to a Google API. 

176 

177 A **simple** media upload sends no metadata and completes the upload 

178 in a single request. 

179 

180 Args: 

181 upload_url (str): The URL where the content will be uploaded. 

182 headers (Optional[Mapping[str, str]]): Extra headers that should 

183 be sent with the request, e.g. headers for encrypted data. 

184 retry (Optional[google.api_core.retry.Retry]): How to retry the 

185 RPC. A None value will disable retries. A 

186 google.api_core.retry.Retry value will enable retries, and the 

187 object will configure backoff and timeout options. 

188 

189 See the retry.py source code and docstrings in this package 

190 (google.cloud.storage.retry) for information on retry types and how 

191 to configure them. 

192 

193 Attributes: 

194 upload_url (str): The URL where the content will be uploaded. 

195 """ 

196 

197 def _prepare_request(self, data, content_type): 

198 """Prepare the contents of an HTTP request. 

199 

200 This is everything that must be done before a request that doesn't 

201 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

202 philosophy. 

203 

204 .. note: 

205 

206 This method will be used only once, so ``headers`` will be 

207 mutated by having a new key added to it. 

208 

209 Args: 

210 data (bytes): The resource content to be uploaded. 

211 content_type (str): The content type for the request. 

212 

213 Returns: 

214 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

215 

216 * HTTP verb for the request (always POST) 

217 * the URL for the request 

218 * the body of the request 

219 * headers for the request 

220 

221 Raises: 

222 ValueError: If the current upload has already finished. 

223 TypeError: If ``data`` isn't bytes. 

224 

225 .. _sans-I/O: https://sans-io.readthedocs.io/ 

226 """ 

227 if self.finished: 

228 raise ValueError("An upload can only be used once.") 

229 

230 if not isinstance(data, bytes): 

231 raise TypeError("`data` must be bytes, received", type(data)) 

232 self._headers[_CONTENT_TYPE_HEADER] = content_type 

233 return _POST, self.upload_url, data, self._headers 

234 

235 def transmit(self, transport, data, content_type, timeout=None): 

236 """Transmit the resource to be uploaded. 

237 

238 Args: 

239 transport (object): An object which can make authenticated 

240 requests. 

241 data (bytes): The resource content to be uploaded. 

242 content_type (str): The content type of the resource, e.g. a JPEG 

243 image has content type ``image/jpeg``. 

244 timeout (Optional[Union[float, Tuple[float, float]]]): 

245 The number of seconds to wait for the server response. 

246 Depending on the retry strategy, a request may be repeated 

247 several times using the same timeout each time. 

248 

249 Can also be passed as a tuple (connect_timeout, read_timeout). 

250 See :meth:`requests.Session.request` documentation for details. 

251 

252 Raises: 

253 NotImplementedError: Always, since virtual. 

254 """ 

255 raise NotImplementedError("This implementation is virtual.") 

256 

257 

258class MultipartUpload(UploadBase): 

259 """Upload a resource with metadata to a Google API. 

260 

261 A **multipart** upload sends both metadata and the resource in a single 

262 (multipart) request. 

263 

264 Args: 

265 upload_url (str): The URL where the content will be uploaded. 

266 headers (Optional[Mapping[str, str]]): Extra headers that should 

267 be sent with the request, e.g. headers for encrypted data. 

268 checksum Optional([str]): The type of checksum to compute to verify 

269 the integrity of the object. The request metadata will be amended 

270 to include the computed value. Using this option will override a 

271 manually-set checksum value. Supported values are "md5", 

272 "crc32c", "auto", and None. The default is "auto", which will try 

273 to detect if the C extension for crc32c is installed and fall back 

274 to md5 otherwise. 

275 retry (Optional[google.api_core.retry.Retry]): How to retry the 

276 RPC. A None value will disable retries. A 

277 google.api_core.retry.Retry value will enable retries, and the 

278 object will configure backoff and timeout options. 

279 

280 See the retry.py source code and docstrings in this package 

281 (google.cloud.storage.retry) for information on retry types and how 

282 to configure them. 

283 

284 Attributes: 

285 upload_url (str): The URL where the content will be uploaded. 

286 """ 

287 

288 def __init__(self, upload_url, headers=None, checksum="auto", retry=DEFAULT_RETRY): 

289 super(MultipartUpload, self).__init__(upload_url, headers=headers, retry=retry) 

290 self._checksum_type = checksum 

291 if self._checksum_type == "auto": 

292 self._checksum_type = ( 

293 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

294 ) 

295 

296 def _prepare_request(self, data, metadata, content_type): 

297 """Prepare the contents of an HTTP request. 

298 

299 This is everything that must be done before a request that doesn't 

300 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

301 philosophy. 

302 

303 .. note: 

304 

305 This method will be used only once, so ``headers`` will be 

306 mutated by having a new key added to it. 

307 

308 Args: 

309 data (bytes): The resource content to be uploaded. 

310 metadata (Mapping[str, str]): The resource metadata, such as an 

311 ACL list. 

312 content_type (str): The content type of the resource, e.g. a JPEG 

313 image has content type ``image/jpeg``. 

314 

315 Returns: 

316 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

317 

318 * HTTP verb for the request (always POST) 

319 * the URL for the request 

320 * the body of the request 

321 * headers for the request 

322 

323 Raises: 

324 ValueError: If the current upload has already finished. 

325 TypeError: If ``data`` isn't bytes. 

326 

327 .. _sans-I/O: https://sans-io.readthedocs.io/ 

328 """ 

329 if self.finished: 

330 raise ValueError("An upload can only be used once.") 

331 

332 if not isinstance(data, bytes): 

333 raise TypeError("`data` must be bytes, received", type(data)) 

334 

335 checksum_object = _helpers._get_checksum_object(self._checksum_type) 

336 if checksum_object is not None: 

337 checksum_object.update(data) 

338 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 

339 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

340 metadata[metadata_key] = actual_checksum 

341 

342 content, multipart_boundary = construct_multipart_request( 

343 data, metadata, content_type 

344 ) 

345 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"' 

346 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type 

347 

348 return _POST, self.upload_url, content, self._headers 

349 

350 def transmit(self, transport, data, metadata, content_type, timeout=None): 

351 """Transmit the resource to be uploaded. 

352 

353 Args: 

354 transport (object): An object which can make authenticated 

355 requests. 

356 data (bytes): The resource content to be uploaded. 

357 metadata (Mapping[str, str]): The resource metadata, such as an 

358 ACL list. 

359 content_type (str): The content type of the resource, e.g. a JPEG 

360 image has content type ``image/jpeg``. 

361 timeout (Optional[Union[float, Tuple[float, float]]]): 

362 The number of seconds to wait for the server response. 

363 Depending on the retry strategy, a request may be repeated 

364 several times using the same timeout each time. 

365 

366 Can also be passed as a tuple (connect_timeout, read_timeout). 

367 See :meth:`requests.Session.request` documentation for details. 

368 

369 Raises: 

370 NotImplementedError: Always, since virtual. 

371 """ 

372 raise NotImplementedError("This implementation is virtual.") 

373 

374 

375class ResumableUpload(UploadBase): 

376 """Initiate and fulfill a resumable upload to a Google API. 

377 

378 A **resumable** upload sends an initial request with the resource metadata 

379 and then gets assigned an upload ID / upload URL to send bytes to. 

380 Using the upload URL, the upload is then done in chunks (determined by 

381 the user) until all bytes have been uploaded. 

382 

383 Args: 

384 upload_url (str): The URL where the resumable upload will be initiated. 

385 chunk_size (int): The size of each chunk used to upload the resource. 

386 headers (Optional[Mapping[str, str]]): Extra headers that should 

387 be sent with every request. 

388 checksum Optional([str]): The type of checksum to compute to verify 

389 the integrity of the object. After the upload is complete, the 

390 server-computed checksum of the resulting object will be checked 

391 and google.cloud.storage.exceptions.DataCorruption will be raised on 

392 a mismatch. The corrupted file will not be deleted from the remote 

393 host automatically. Supported values are "md5", "crc32c", "auto", 

394 and None. The default is "auto", which will try to detect if the C 

395 extension for crc32c is installed and fall back to md5 otherwise. 

396 retry (Optional[google.api_core.retry.Retry]): How to retry the 

397 RPC. A None value will disable retries. A 

398 google.api_core.retry.Retry value will enable retries, and the 

399 object will configure backoff and timeout options. 

400 

401 See the retry.py source code and docstrings in this package 

402 (google.cloud.storage.retry) for information on retry types and how 

403 to configure them. 

404 

405 Attributes: 

406 upload_url (str): The URL where the content will be uploaded. 

407 

408 Raises: 

409 ValueError: If ``chunk_size`` is not a multiple of 

410 :data:`.UPLOAD_CHUNK_SIZE`. 

411 """ 

412 

413 def __init__( 

414 self, 

415 upload_url, 

416 chunk_size, 

417 checksum="auto", 

418 headers=None, 

419 retry=DEFAULT_RETRY, 

420 ): 

421 super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry) 

422 if chunk_size % UPLOAD_CHUNK_SIZE != 0: 

423 raise ValueError( 

424 "{} KB must divide chunk size".format(UPLOAD_CHUNK_SIZE / 1024) 

425 ) 

426 self._chunk_size = chunk_size 

427 self._stream = None 

428 self._content_type = None 

429 self._bytes_uploaded = 0 

430 self._bytes_checksummed = 0 

431 self._checksum_type = checksum 

432 if self._checksum_type == "auto": 

433 self._checksum_type = ( 

434 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

435 ) 

436 self._checksum_object = None 

437 self._total_bytes = None 

438 self._resumable_url = None 

439 self._invalid = False 

440 

441 @property 

442 def invalid(self): 

443 """bool: Indicates if the upload is in an invalid state. 

444 

445 This will occur if a call to :meth:`transmit_next_chunk` fails. 

446 To recover from such a failure, call :meth:`recover`. 

447 """ 

448 return self._invalid 

449 

450 @property 

451 def chunk_size(self): 

452 """int: The size of each chunk used to upload the resource.""" 

453 return self._chunk_size 

454 

455 @property 

456 def resumable_url(self): 

457 """Optional[str]: The URL of the in-progress resumable upload.""" 

458 return self._resumable_url 

459 

460 @property 

461 def bytes_uploaded(self): 

462 """int: Number of bytes that have been uploaded.""" 

463 return self._bytes_uploaded 

464 

465 @property 

466 def total_bytes(self): 

467 """Optional[int]: The total number of bytes to be uploaded. 

468 

469 If this upload is initiated (via :meth:`initiate`) with 

470 ``stream_final=True``, this value will be populated based on the size 

471 of the ``stream`` being uploaded. (By default ``stream_final=True``.) 

472 

473 If this upload is initiated with ``stream_final=False``, 

474 :attr:`total_bytes` will be :data:`None` since it cannot be 

475 determined from the stream. 

476 """ 

477 return self._total_bytes 

478 

479 def _prepare_initiate_request( 

480 self, 

481 stream, 

482 metadata, 

483 content_type, 

484 total_bytes=None, 

485 stream_final=True, 

486 ): 

487 """Prepare the contents of HTTP request to initiate upload. 

488 

489 This is everything that must be done before a request that doesn't 

490 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

491 philosophy. 

492 

493 Args: 

494 stream (IO[bytes]): The stream (i.e. file-like object) that will 

495 be uploaded. The stream **must** be at the beginning (i.e. 

496 ``stream.tell() == 0``). 

497 metadata (Mapping[str, str]): The resource metadata, such as an 

498 ACL list. 

499 content_type (str): The content type of the resource, e.g. a JPEG 

500 image has content type ``image/jpeg``. 

501 total_bytes (Optional[int]): The total number of bytes to be 

502 uploaded. If specified, the upload size **will not** be 

503 determined from the stream (even if ``stream_final=True``). 

504 stream_final (Optional[bool]): Indicates if the ``stream`` is 

505 "final" (i.e. no more bytes will be added to it). In this case 

506 we determine the upload size from the size of the stream. If 

507 ``total_bytes`` is passed, this argument will be ignored. 

508 

509 Returns: 

510 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

511 

512 * HTTP verb for the request (always POST) 

513 * the URL for the request 

514 * the body of the request 

515 * headers for the request 

516 

517 Raises: 

518 ValueError: If the current upload has already been initiated. 

519 ValueError: If ``stream`` is not at the beginning. 

520 

521 .. _sans-I/O: https://sans-io.readthedocs.io/ 

522 """ 

523 if self.resumable_url is not None: 

524 raise ValueError("This upload has already been initiated.") 

525 if stream.tell() != 0: 

526 raise ValueError("Stream must be at beginning.") 

527 

528 self._stream = stream 

529 self._content_type = content_type 

530 

531 # Signed URL requires content type set directly - not through x-upload-content-type 

532 parse_result = urllib.parse.urlparse(self.upload_url) 

533 parsed_query = urllib.parse.parse_qs(parse_result.query) 

534 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query: 

535 # Deconstruct **self._headers first so that content type defined here takes priority 

536 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type} 

537 else: 

538 # Deconstruct **self._headers first so that content type defined here takes priority 

539 headers = { 

540 **self._headers, 

541 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8", 

542 "x-upload-content-type": content_type, 

543 } 

544 # Set the total bytes if possible. 

545 if total_bytes is not None: 

546 self._total_bytes = total_bytes 

547 elif stream_final: 

548 self._total_bytes = get_total_bytes(stream) 

549 # Add the total bytes to the headers if set. 

550 if self._total_bytes is not None: 

551 content_length = "{:d}".format(self._total_bytes) 

552 headers["x-upload-content-length"] = content_length 

553 

554 payload = json.dumps(metadata).encode("utf-8") 

555 return _POST, self.upload_url, payload, headers 

556 

557 def _process_initiate_response(self, response): 

558 """Process the response from an HTTP request that initiated upload. 

559 

560 This is everything that must be done after a request that doesn't 

561 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

562 philosophy. 

563 

564 This method takes the URL from the ``Location`` header and stores it 

565 for future use. Within that URL, we assume the ``upload_id`` query 

566 parameter has been included, but we do not check. 

567 

568 Args: 

569 response (object): The HTTP response object (need headers). 

570 

571 .. _sans-I/O: https://sans-io.readthedocs.io/ 

572 """ 

573 _helpers.require_status_code( 

574 response, 

575 (http.client.OK, http.client.CREATED), 

576 self._get_status_code, 

577 callback=self._make_invalid, 

578 ) 

579 self._resumable_url = _helpers.header_required( 

580 response, "location", self._get_headers 

581 ) 

582 

583 def initiate( 

584 self, 

585 transport, 

586 stream, 

587 metadata, 

588 content_type, 

589 total_bytes=None, 

590 stream_final=True, 

591 timeout=None, 

592 ): 

593 """Initiate a resumable upload. 

594 

595 By default, this method assumes your ``stream`` is in a "final" 

596 state ready to transmit. However, ``stream_final=False`` can be used 

597 to indicate that the size of the resource is not known. This can happen 

598 if bytes are being dynamically fed into ``stream``, e.g. if the stream 

599 is attached to application logs. 

600 

601 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 

602 read from the stream every time :meth:`transmit_next_chunk` is called. 

603 If one of those reads produces strictly fewer bites than the chunk 

604 size, the upload will be concluded. 

605 

606 Args: 

607 transport (object): An object which can make authenticated 

608 requests. 

609 stream (IO[bytes]): The stream (i.e. file-like object) that will 

610 be uploaded. The stream **must** be at the beginning (i.e. 

611 ``stream.tell() == 0``). 

612 metadata (Mapping[str, str]): The resource metadata, such as an 

613 ACL list. 

614 content_type (str): The content type of the resource, e.g. a JPEG 

615 image has content type ``image/jpeg``. 

616 total_bytes (Optional[int]): The total number of bytes to be 

617 uploaded. If specified, the upload size **will not** be 

618 determined from the stream (even if ``stream_final=True``). 

619 stream_final (Optional[bool]): Indicates if the ``stream`` is 

620 "final" (i.e. no more bytes will be added to it). In this case 

621 we determine the upload size from the size of the stream. If 

622 ``total_bytes`` is passed, this argument will be ignored. 

623 timeout (Optional[Union[float, Tuple[float, float]]]): 

624 The number of seconds to wait for the server response. 

625 Depending on the retry strategy, a request may be repeated 

626 several times using the same timeout each time. 

627 

628 Can also be passed as a tuple (connect_timeout, read_timeout). 

629 See :meth:`requests.Session.request` documentation for details. 

630 

631 Raises: 

632 NotImplementedError: Always, since virtual. 

633 """ 

634 raise NotImplementedError("This implementation is virtual.") 

635 

636 def _prepare_request(self): 

637 """Prepare the contents of HTTP request to upload a chunk. 

638 

639 This is everything that must be done before a request that doesn't 

640 require network I/O. This is based on the `sans-I/O`_ philosophy. 

641 

642 For the time being, this **does require** some form of I/O to read 

643 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this 

644 will (almost) certainly not be network I/O. 

645 

646 Returns: 

647 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

648 

649 * HTTP verb for the request (always PUT) 

650 * the URL for the request 

651 * the body of the request 

652 * headers for the request 

653 

654 The headers incorporate the ``_headers`` on the current instance. 

655 

656 Raises: 

657 ValueError: If the current upload has finished. 

658 ValueError: If the current upload is in an invalid state. 

659 ValueError: If the current upload has not been initiated. 

660 ValueError: If the location in the stream (i.e. ``stream.tell()``) 

661 does not agree with ``bytes_uploaded``. 

662 

663 .. _sans-I/O: https://sans-io.readthedocs.io/ 

664 """ 

665 if self.finished: 

666 raise ValueError("Upload has finished.") 

667 if self.invalid: 

668 raise ValueError( 

669 "Upload is in an invalid state. To recover call `recover()`." 

670 ) 

671 if self.resumable_url is None: 

672 raise ValueError( 

673 "This upload has not been initiated. Please call " 

674 "initiate() before beginning to transmit chunks." 

675 ) 

676 

677 start_byte, payload, content_range = get_next_chunk( 

678 self._stream, self._chunk_size, self._total_bytes 

679 ) 

680 if start_byte != self.bytes_uploaded: 

681 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded) 

682 raise ValueError(msg) 

683 

684 self._update_checksum(start_byte, payload) 

685 

686 headers = { 

687 **self._headers, 

688 _CONTENT_TYPE_HEADER: self._content_type, 

689 _helpers.CONTENT_RANGE_HEADER: content_range, 

690 } 

691 if (start_byte + len(payload) == self._total_bytes) and ( 

692 self._checksum_object is not None 

693 ): 

694 local_checksum = _helpers.prepare_checksum_digest( 

695 self._checksum_object.digest() 

696 ) 

697 headers["x-goog-hash"] = f"{self._checksum_type}={local_checksum}" 

698 return _PUT, self.resumable_url, payload, headers 

699 

700 def _update_checksum(self, start_byte, payload): 

701 """Update the checksum with the payload if not already updated. 

702 

703 Because error recovery can result in bytes being transmitted more than 

704 once, the checksum tracks the number of bytes checked in 

705 self._bytes_checksummed and skips bytes that have already been summed. 

706 """ 

707 if not self._checksum_type: 

708 return 

709 

710 if not self._checksum_object: 

711 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

712 

713 if start_byte < self._bytes_checksummed: 

714 offset = self._bytes_checksummed - start_byte 

715 data = payload[offset:] 

716 else: 

717 data = payload 

718 

719 self._checksum_object.update(data) 

720 self._bytes_checksummed += len(data) 

721 

722 def _make_invalid(self): 

723 """Simple setter for ``invalid``. 

724 

725 This is intended to be passed along as a callback to helpers that 

726 raise an exception so they can mark this instance as invalid before 

727 raising. 

728 """ 

729 self._invalid = True 

730 

731 def _process_resumable_response(self, response, bytes_sent): 

732 """Process the response from an HTTP request. 

733 

734 This is everything that must be done after a request that doesn't 

735 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

736 philosophy. 

737 

738 Args: 

739 response (object): The HTTP response object. 

740 bytes_sent (int): The number of bytes sent in the request that 

741 ``response`` was returned for. 

742 

743 Raises: 

744 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

745 code is 308 and the ``range`` header is not of the form 

746 ``bytes 0-{end}``. 

747 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

748 code is not 200 or 308. 

749 

750 .. _sans-I/O: https://sans-io.readthedocs.io/ 

751 """ 

752 status_code = _helpers.require_status_code( 

753 response, 

754 (http.client.OK, http.client.PERMANENT_REDIRECT), 

755 self._get_status_code, 

756 callback=self._make_invalid, 

757 ) 

758 if status_code == http.client.OK: 

759 # NOTE: We use the "local" information of ``bytes_sent`` to update 

760 # ``bytes_uploaded``, but do not verify this against other 

761 # state. However, there may be some other information: 

762 # 

763 # * a ``size`` key in JSON response body 

764 # * the ``total_bytes`` attribute (if set) 

765 # * ``stream.tell()`` (relying on fact that ``initiate()`` 

766 # requires stream to be at the beginning) 

767 self._bytes_uploaded = self._bytes_uploaded + bytes_sent 

768 # Tombstone the current upload so it cannot be used again. 

769 self._finished = True 

770 # Validate the checksum. This can raise an exception on failure. 

771 self._validate_checksum(response) 

772 else: 

773 bytes_range = _helpers.header_required( 

774 response, 

775 _helpers.RANGE_HEADER, 

776 self._get_headers, 

777 callback=self._make_invalid, 

778 ) 

779 match = _BYTES_RANGE_RE.match(bytes_range) 

780 if match is None: 

781 self._make_invalid() 

782 raise InvalidResponse( 

783 response, 

784 'Unexpected "range" header', 

785 bytes_range, 

786 'Expected to be of the form "bytes=0-{end}"', 

787 ) 

788 self._bytes_uploaded = int(match.group("end_byte")) + 1 

789 

790 def _validate_checksum(self, response): 

791 """Check the computed checksum, if any, against the recieved metadata. 

792 

793 Args: 

794 response (object): The HTTP response object. 

795 

796 Raises: 

797 ~google.cloud.storage.exceptions.DataCorruption: If the checksum 

798 computed locally and the checksum reported by the remote host do 

799 not match. 

800 """ 

801 if self._checksum_type is None: 

802 return 

803 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

804 metadata = response.json() 

805 remote_checksum = metadata.get(metadata_key) 

806 if remote_checksum is None: 

807 raise InvalidResponse( 

808 response, 

809 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

810 self._get_headers(response), 

811 ) 

812 local_checksum = _helpers.prepare_checksum_digest( 

813 self._checksum_object.digest() 

814 ) 

815 if local_checksum != remote_checksum: 

816 raise DataCorruption( 

817 response, 

818 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

819 self._checksum_type.upper(), local_checksum, remote_checksum 

820 ), 

821 ) 

822 

823 def transmit_next_chunk(self, transport, timeout=None): 

824 """Transmit the next chunk of the resource to be uploaded. 

825 

826 If the current upload was initiated with ``stream_final=False``, 

827 this method will dynamically determine if the upload has completed. 

828 The upload will be considered complete if the stream produces 

829 fewer than :attr:`chunk_size` bytes when a chunk is read from it. 

830 

831 Args: 

832 transport (object): An object which can make authenticated 

833 requests. 

834 timeout (Optional[Union[float, Tuple[float, float]]]): 

835 The number of seconds to wait for the server response. 

836 Depending on the retry strategy, a request may be repeated 

837 several times using the same timeout each time. 

838 

839 Can also be passed as a tuple (connect_timeout, read_timeout). 

840 See :meth:`requests.Session.request` documentation for details. 

841 

842 Raises: 

843 NotImplementedError: Always, since virtual. 

844 """ 

845 raise NotImplementedError("This implementation is virtual.") 

846 

847 def _prepare_recover_request(self): 

848 """Prepare the contents of HTTP request to recover from failure. 

849 

850 This is everything that must be done before a request that doesn't 

851 require network I/O. This is based on the `sans-I/O`_ philosophy. 

852 

853 We assume that the :attr:`resumable_url` is set (i.e. the only way 

854 the upload can end up :attr:`invalid` is if it has been initiated. 

855 

856 Returns: 

857 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

858 

859 * HTTP verb for the request (always PUT) 

860 * the URL for the request 

861 * the body of the request (always :data:`None`) 

862 * headers for the request 

863 

864 The headers **do not** incorporate the ``_headers`` on the 

865 current instance. 

866 

867 .. _sans-I/O: https://sans-io.readthedocs.io/ 

868 """ 

869 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"} 

870 return _PUT, self.resumable_url, None, headers 

871 

872 def _process_recover_response(self, response): 

873 """Process the response from an HTTP request to recover from failure. 

874 

875 This is everything that must be done after a request that doesn't 

876 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

877 philosophy. 

878 

879 Args: 

880 response (object): The HTTP response object. 

881 

882 Raises: 

883 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

884 code is not 308. 

885 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

886 code is 308 and the ``range`` header is not of the form 

887 ``bytes 0-{end}``. 

888 

889 .. _sans-I/O: https://sans-io.readthedocs.io/ 

890 """ 

891 _helpers.require_status_code( 

892 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code 

893 ) 

894 headers = self._get_headers(response) 

895 if _helpers.RANGE_HEADER in headers: 

896 bytes_range = headers[_helpers.RANGE_HEADER] 

897 match = _BYTES_RANGE_RE.match(bytes_range) 

898 if match is None: 

899 raise InvalidResponse( 

900 response, 

901 'Unexpected "range" header', 

902 bytes_range, 

903 'Expected to be of the form "bytes=0-{end}"', 

904 ) 

905 self._bytes_uploaded = int(match.group("end_byte")) + 1 

906 else: 

907 # In this case, the upload has not "begun". 

908 self._bytes_uploaded = 0 

909 

910 self._stream.seek(self._bytes_uploaded) 

911 self._invalid = False 

912 

913 def recover(self, transport): 

914 """Recover from a failure. 

915 

916 This method should be used when a :class:`ResumableUpload` is in an 

917 :attr:`~ResumableUpload.invalid` state due to a request failure. 

918 

919 This will verify the progress with the server and make sure the 

920 current upload is in a valid state before :meth:`transmit_next_chunk` 

921 can be used again. 

922 

923 Args: 

924 transport (object): An object which can make authenticated 

925 requests. 

926 

927 Raises: 

928 NotImplementedError: Always, since virtual. 

929 """ 

930 raise NotImplementedError("This implementation is virtual.") 

931 

932 

933class XMLMPUContainer(UploadBase): 

934 """Initiate and close an upload using the XML MPU API. 

935 

936 An XML MPU sends an initial request and then receives an upload ID. 

937 Using the upload ID, the upload is then done in numbered parts and the 

938 parts can be uploaded concurrently. 

939 

940 In order to avoid concurrency issues with this container object, the 

941 uploading of individual parts is handled separately, by XMLMPUPart objects 

942 spawned from this container class. The XMLMPUPart objects are not 

943 necessarily in the same process as the container, so they do not update the 

944 container automatically. 

945 

946 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

947 given the JSON multipart upload, so the abbreviation "MPU" will be used 

948 throughout. 

949 

950 See: https://cloud.google.com/storage/docs/multipart-uploads 

951 

952 Args: 

953 upload_url (str): The URL of the object (without query parameters). The 

954 initiate, PUT, and finalization requests will all use this URL, with 

955 varying query parameters. 

956 filename (str): The name (path) of the file to upload. 

957 headers (Optional[Mapping[str, str]]): Extra headers that should 

958 be sent with every request. 

959 retry (Optional[google.api_core.retry.Retry]): How to retry the 

960 RPC. A None value will disable retries. A 

961 google.api_core.retry.Retry value will enable retries, and the 

962 object will configure backoff and timeout options. 

963 

964 See the retry.py source code and docstrings in this package 

965 (google.cloud.storage.retry) for information on retry types and how 

966 to configure them. 

967 

968 Attributes: 

969 upload_url (str): The URL where the content will be uploaded. 

970 upload_id (Optional(str)): The ID of the upload from the initialization 

971 response. 

972 """ 

973 

974 def __init__( 

975 self, 

976 upload_url, 

977 filename, 

978 headers=None, 

979 upload_id=None, 

980 retry=DEFAULT_RETRY, 

981 ): 

982 super().__init__(upload_url, headers=headers, retry=retry) 

983 self._filename = filename 

984 self._upload_id = upload_id 

985 self._parts = {} 

986 

987 @property 

988 def upload_id(self): 

989 return self._upload_id 

990 

991 def register_part(self, part_number, etag): 

992 """Register an uploaded part by part number and corresponding etag. 

993 

994 XMLMPUPart objects represent individual parts, and their part number 

995 and etag can be registered to the container object with this method 

996 and therefore incorporated in the finalize() call to finish the upload. 

997 

998 This method accepts part_number and etag, but not XMLMPUPart objects 

999 themselves, to reduce the complexity involved in running XMLMPUPart 

1000 uploads in separate processes. 

1001 

1002 Args: 

1003 part_number (int): The part number. Parts are assembled into the 

1004 final uploaded object with finalize() in order of their part 

1005 numbers. 

1006 etag (str): The etag included in the server response after upload. 

1007 """ 

1008 self._parts[part_number] = etag 

1009 

1010 def _prepare_initiate_request(self, content_type): 

1011 """Prepare the contents of HTTP request to initiate upload. 

1012 

1013 This is everything that must be done before a request that doesn't 

1014 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1015 philosophy. 

1016 

1017 Args: 

1018 content_type (str): The content type of the resource, e.g. a JPEG 

1019 image has content type ``image/jpeg``. 

1020 

1021 Returns: 

1022 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1023 

1024 * HTTP verb for the request (always POST) 

1025 * the URL for the request 

1026 * the body of the request 

1027 * headers for the request 

1028 

1029 Raises: 

1030 ValueError: If the current upload has already been initiated. 

1031 

1032 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1033 """ 

1034 if self.upload_id is not None: 

1035 raise ValueError("This upload has already been initiated.") 

1036 

1037 initiate_url = self.upload_url + _MPU_INITIATE_QUERY 

1038 

1039 headers = { 

1040 **self._headers, 

1041 _CONTENT_TYPE_HEADER: content_type, 

1042 } 

1043 return _POST, initiate_url, None, headers 

1044 

1045 def _process_initiate_response(self, response): 

1046 """Process the response from an HTTP request that initiated the upload. 

1047 

1048 This is everything that must be done after a request that doesn't 

1049 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1050 philosophy. 

1051 

1052 This method takes the URL from the ``Location`` header and stores it 

1053 for future use. Within that URL, we assume the ``upload_id`` query 

1054 parameter has been included, but we do not check. 

1055 

1056 Args: 

1057 response (object): The HTTP response object. 

1058 

1059 Raises: 

1060 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1061 code is not 200. 

1062 

1063 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1064 """ 

1065 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

1066 root = ElementTree.fromstring(response.text) 

1067 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text 

1068 

1069 def initiate( 

1070 self, 

1071 transport, 

1072 content_type, 

1073 timeout=None, 

1074 ): 

1075 """Initiate an MPU and record the upload ID. 

1076 

1077 Args: 

1078 transport (object): An object which can make authenticated 

1079 requests. 

1080 content_type (str): The content type of the resource, e.g. a JPEG 

1081 image has content type ``image/jpeg``. 

1082 timeout (Optional[Union[float, Tuple[float, float]]]): 

1083 The number of seconds to wait for the server response. 

1084 Depending on the retry strategy, a request may be repeated 

1085 several times using the same timeout each time. 

1086 

1087 Can also be passed as a tuple (connect_timeout, read_timeout). 

1088 See :meth:`requests.Session.request` documentation for details. 

1089 

1090 Raises: 

1091 NotImplementedError: Always, since virtual. 

1092 """ 

1093 raise NotImplementedError("This implementation is virtual.") 

1094 

1095 def _prepare_finalize_request(self): 

1096 """Prepare the contents of an HTTP request to finalize the upload. 

1097 

1098 All of the parts must be registered before calling this method. 

1099 

1100 Returns: 

1101 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1102 

1103 * HTTP verb for the request (always POST) 

1104 * the URL for the request 

1105 * the body of the request 

1106 * headers for the request 

1107 

1108 Raises: 

1109 ValueError: If the upload has not been initiated. 

1110 """ 

1111 if self.upload_id is None: 

1112 raise ValueError("This upload has not yet been initiated.") 

1113 

1114 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1115 finalize_url = self.upload_url + final_query 

1116 final_xml_root = ElementTree.Element("CompleteMultipartUpload") 

1117 for part_number, etag in self._parts.items(): 

1118 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop 

1119 ElementTree.SubElement(part, "PartNumber").text = str(part_number) 

1120 ElementTree.SubElement(part, "ETag").text = etag 

1121 payload = ElementTree.tostring(final_xml_root) 

1122 return _POST, finalize_url, payload, self._headers 

1123 

1124 def _process_finalize_response(self, response): 

1125 """Process the response from an HTTP request that finalized the upload. 

1126 

1127 This is everything that must be done after a request that doesn't 

1128 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1129 philosophy. 

1130 

1131 Args: 

1132 response (object): The HTTP response object. 

1133 

1134 Raises: 

1135 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1136 code is not 200. 

1137 

1138 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1139 """ 

1140 

1141 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

1142 self._finished = True 

1143 

1144 def finalize( 

1145 self, 

1146 transport, 

1147 timeout=None, 

1148 ): 

1149 """Finalize an MPU request with all the parts. 

1150 

1151 Args: 

1152 transport (object): An object which can make authenticated 

1153 requests. 

1154 timeout (Optional[Union[float, Tuple[float, float]]]): 

1155 The number of seconds to wait for the server response. 

1156 Depending on the retry strategy, a request may be repeated 

1157 several times using the same timeout each time. 

1158 

1159 Can also be passed as a tuple (connect_timeout, read_timeout). 

1160 See :meth:`requests.Session.request` documentation for details. 

1161 

1162 Raises: 

1163 NotImplementedError: Always, since virtual. 

1164 """ 

1165 raise NotImplementedError("This implementation is virtual.") 

1166 

1167 def _prepare_cancel_request(self): 

1168 """Prepare the contents of an HTTP request to cancel the upload. 

1169 

1170 Returns: 

1171 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1172 

1173 * HTTP verb for the request (always DELETE) 

1174 * the URL for the request 

1175 * the body of the request 

1176 * headers for the request 

1177 

1178 Raises: 

1179 ValueError: If the upload has not been initiated. 

1180 """ 

1181 if self.upload_id is None: 

1182 raise ValueError("This upload has not yet been initiated.") 

1183 

1184 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1185 cancel_url = self.upload_url + cancel_query 

1186 return _DELETE, cancel_url, None, self._headers 

1187 

1188 def _process_cancel_response(self, response): 

1189 """Process the response from an HTTP request that canceled the upload. 

1190 

1191 This is everything that must be done after a request that doesn't 

1192 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1193 philosophy. 

1194 

1195 Args: 

1196 response (object): The HTTP response object. 

1197 

1198 Raises: 

1199 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1200 code is not 204. 

1201 

1202 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1203 """ 

1204 

1205 _helpers.require_status_code( 

1206 response, (http.client.NO_CONTENT,), self._get_status_code 

1207 ) 

1208 

1209 def cancel( 

1210 self, 

1211 transport, 

1212 timeout=None, 

1213 ): 

1214 """Cancel an MPU request and permanently delete any uploaded parts. 

1215 

1216 This cannot be undone. 

1217 

1218 Args: 

1219 transport (object): An object which can make authenticated 

1220 requests. 

1221 timeout (Optional[Union[float, Tuple[float, float]]]): 

1222 The number of seconds to wait for the server response. 

1223 Depending on the retry strategy, a request may be repeated 

1224 several times using the same timeout each time. 

1225 

1226 Can also be passed as a tuple (connect_timeout, read_timeout). 

1227 See :meth:`requests.Session.request` documentation for details. 

1228 

1229 Raises: 

1230 NotImplementedError: Always, since virtual. 

1231 """ 

1232 raise NotImplementedError("This implementation is virtual.") 

1233 

1234 

1235class XMLMPUPart(UploadBase): 

1236 """Upload a single part of an existing XML MPU container. 

1237 

1238 An XML MPU sends an initial request and then receives an upload ID. 

1239 Using the upload ID, the upload is then done in numbered parts and the 

1240 parts can be uploaded concurrently. 

1241 

1242 In order to avoid concurrency issues with the container object, the 

1243 uploading of individual parts is handled separately by multiple objects 

1244 of this class. Once a part is uploaded, it can be registered with the 

1245 container with `container.register_part(part.part_number, part.etag)`. 

1246 

1247 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

1248 given the JSON multipart upload, so the abbreviation "MPU" will be used 

1249 throughout. 

1250 

1251 See: https://cloud.google.com/storage/docs/multipart-uploads 

1252 

1253 Args: 

1254 upload_url (str): The URL of the object (without query parameters). 

1255 upload_id (str): The ID of the upload from the initialization response. 

1256 filename (str): The name (path) of the file to upload. 

1257 start (int): The byte index of the beginning of the part. 

1258 end (int): The byte index of the end of the part. 

1259 part_number (int): The part number. Part numbers will be assembled in 

1260 sequential order when the container is finalized. 

1261 headers (Optional[Mapping[str, str]]): Extra headers that should 

1262 be sent with every request. 

1263 checksum (Optional([str])): The type of checksum to compute to verify 

1264 the integrity of the object. The request headers will be amended 

1265 to include the computed value. Supported values are "md5", "crc32c", 

1266 "auto" and None. The default is "auto", which will try to detect if 

1267 the C extension for crc32c is installed and fall back to md5 

1268 otherwise. 

1269 retry (Optional[google.api_core.retry.Retry]): How to retry the 

1270 RPC. A None value will disable retries. A 

1271 google.api_core.retry.Retry value will enable retries, and the 

1272 object will configure backoff and timeout options. 

1273 

1274 See the retry.py source code and docstrings in this package 

1275 (google.cloud.storage.retry) for information on retry types and how 

1276 to configure them. 

1277 

1278 Attributes: 

1279 upload_url (str): The URL of the object (without query parameters). 

1280 upload_id (str): The ID of the upload from the initialization response. 

1281 filename (str): The name (path) of the file to upload. 

1282 start (int): The byte index of the beginning of the part. 

1283 end (int): The byte index of the end of the part. 

1284 part_number (int): The part number. Part numbers will be assembled in 

1285 sequential order when the container is finalized. 

1286 etag (Optional(str)): The etag returned by the service after upload. 

1287 """ 

1288 

1289 def __init__( 

1290 self, 

1291 upload_url, 

1292 upload_id, 

1293 filename, 

1294 start, 

1295 end, 

1296 part_number, 

1297 headers=None, 

1298 checksum="auto", 

1299 retry=DEFAULT_RETRY, 

1300 ): 

1301 super().__init__(upload_url, headers=headers, retry=retry) 

1302 self._filename = filename 

1303 self._start = start 

1304 self._end = end 

1305 self._upload_id = upload_id 

1306 self._part_number = part_number 

1307 self._etag = None 

1308 self._checksum_type = checksum 

1309 if self._checksum_type == "auto": 

1310 self._checksum_type = ( 

1311 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

1312 ) 

1313 self._checksum_object = None 

1314 

1315 @property 

1316 def part_number(self): 

1317 return self._part_number 

1318 

1319 @property 

1320 def upload_id(self): 

1321 return self._upload_id 

1322 

1323 @property 

1324 def filename(self): 

1325 return self._filename 

1326 

1327 @property 

1328 def etag(self): 

1329 return self._etag 

1330 

1331 @property 

1332 def start(self): 

1333 return self._start 

1334 

1335 @property 

1336 def end(self): 

1337 return self._end 

1338 

1339 def _prepare_upload_request(self): 

1340 """Prepare the contents of HTTP request to upload a part. 

1341 

1342 This is everything that must be done before a request that doesn't 

1343 require network I/O. This is based on the `sans-I/O`_ philosophy. 

1344 

1345 For the time being, this **does require** some form of I/O to read 

1346 a part from ``stream`` (via :func:`get_part_payload`). However, this 

1347 will (almost) certainly not be network I/O. 

1348 

1349 Returns: 

1350 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1351 

1352 * HTTP verb for the request (always PUT) 

1353 * the URL for the request 

1354 * the body of the request 

1355 * headers for the request 

1356 

1357 The headers incorporate the ``_headers`` on the current instance. 

1358 

1359 Raises: 

1360 ValueError: If the current upload has finished. 

1361 

1362 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1363 """ 

1364 if self.finished: 

1365 raise ValueError("This part has already been uploaded.") 

1366 

1367 with open(self._filename, "br") as f: 

1368 f.seek(self._start) 

1369 payload = f.read(self._end - self._start) 

1370 

1371 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

1372 if self._checksum_object is not None: 

1373 self._checksum_object.update(payload) 

1374 

1375 part_query = _MPU_PART_QUERY_TEMPLATE.format( 

1376 part=self._part_number, upload_id=self._upload_id 

1377 ) 

1378 upload_url = self.upload_url + part_query 

1379 return _PUT, upload_url, payload, self._headers 

1380 

1381 def _process_upload_response(self, response): 

1382 """Process the response from an HTTP request. 

1383 

1384 This is everything that must be done after a request that doesn't 

1385 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1386 philosophy. 

1387 

1388 Args: 

1389 response (object): The HTTP response object. 

1390 

1391 Raises: 

1392 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1393 code is not 200 or the response is missing data. 

1394 

1395 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1396 """ 

1397 # Data corruption errors shouldn't be considered as invalid responses, 

1398 # So we handle them earlier than call to `_helpers.require_status_code`. 

1399 # If the response is 400, we check for data corruption errors. 

1400 if response.status_code == 400: 

1401 root = ElementTree.fromstring(response.text) 

1402 error_code = root.find("Code").text 

1403 error_message = root.find("Message").text 

1404 error_details = root.find("Details").text 

1405 if error_code in ["InvalidDigest", "BadDigest", "CrcMismatch"]: 

1406 raise DataCorruption( 

1407 response, 

1408 ( 

1409 "Checksum mismatch: checksum calculated by client and" 

1410 " server did not match. Error code: {error_code}," 

1411 " Error message: {error_message}," 

1412 " Error details: {error_details}" 

1413 ).format( 

1414 error_code=error_code, 

1415 error_message=error_message, 

1416 error_details=error_details, 

1417 ), 

1418 ) 

1419 

1420 _helpers.require_status_code( 

1421 response, 

1422 (http.client.OK,), 

1423 self._get_status_code, 

1424 ) 

1425 

1426 self._validate_checksum(response) 

1427 

1428 etag = _helpers.header_required(response, "etag", self._get_headers) 

1429 self._etag = etag 

1430 self._finished = True 

1431 

1432 def upload( 

1433 self, 

1434 transport, 

1435 timeout=None, 

1436 ): 

1437 """Upload the part. 

1438 

1439 Args: 

1440 transport (object): An object which can make authenticated 

1441 requests. 

1442 timeout (Optional[Union[float, Tuple[float, float]]]): 

1443 The number of seconds to wait for the server response. 

1444 Depending on the retry strategy, a request may be repeated 

1445 several times using the same timeout each time. 

1446 

1447 Can also be passed as a tuple (connect_timeout, read_timeout). 

1448 See :meth:`requests.Session.request` documentation for details. 

1449 

1450 Raises: 

1451 NotImplementedError: Always, since virtual. 

1452 """ 

1453 raise NotImplementedError("This implementation is virtual.") 

1454 

1455 def _validate_checksum(self, response): 

1456 """Check the computed checksum, if any, against the response headers. 

1457 

1458 Args: 

1459 response (object): The HTTP response object. 

1460 

1461 Raises: 

1462 ~google.cloud.storage.exceptions.DataCorruption: If the checksum 

1463 computed locally and the checksum reported by the remote host do 

1464 not match. 

1465 """ 

1466 if self._checksum_type is None: 

1467 return 

1468 

1469 remote_checksum = _helpers._get_uploaded_checksum_from_headers( 

1470 response, self._get_headers, self._checksum_type 

1471 ) 

1472 

1473 if remote_checksum is None: 

1474 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

1475 raise InvalidResponse( 

1476 response, 

1477 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

1478 self._get_headers(response), 

1479 ) 

1480 local_checksum = _helpers.prepare_checksum_digest( 

1481 self._checksum_object.digest() 

1482 ) 

1483 if local_checksum != remote_checksum: 

1484 raise DataCorruption( 

1485 response, 

1486 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

1487 self._checksum_type.upper(), local_checksum, remote_checksum 

1488 ), 

1489 ) 

1490 

1491 

1492def get_boundary(): 

1493 """Get a random boundary for a multipart request. 

1494 

1495 Returns: 

1496 bytes: The boundary used to separate parts of a multipart request. 

1497 """ 

1498 random_int = random.randrange(sys.maxsize) 

1499 boundary = _BOUNDARY_FORMAT.format(random_int) 

1500 # NOTE: Neither % formatting nor .format() are available for byte strings 

1501 # in Python 3.4, so we must use unicode strings as templates. 

1502 return boundary.encode("utf-8") 

1503 

1504 

1505def construct_multipart_request(data, metadata, content_type): 

1506 """Construct a multipart request body. 

1507 

1508 Args: 

1509 data (bytes): The resource content (UTF-8 encoded as bytes) 

1510 to be uploaded. 

1511 metadata (Mapping[str, str]): The resource metadata, such as an 

1512 ACL list. 

1513 content_type (str): The content type of the resource, e.g. a JPEG 

1514 image has content type ``image/jpeg``. 

1515 

1516 Returns: 

1517 Tuple[bytes, bytes]: The multipart request body and the boundary used 

1518 between each part. 

1519 """ 

1520 multipart_boundary = get_boundary() 

1521 json_bytes = json.dumps(metadata).encode("utf-8") 

1522 content_type = content_type.encode("utf-8") 

1523 # Combine the two parts into a multipart payload. 

1524 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4. 

1525 boundary_sep = _MULTIPART_SEP + multipart_boundary 

1526 content = ( 

1527 boundary_sep 

1528 + _MULTIPART_BEGIN 

1529 + json_bytes 

1530 + _CRLF 

1531 + boundary_sep 

1532 + _CRLF 

1533 + b"content-type: " 

1534 + content_type 

1535 + _CRLF 

1536 + _CRLF 

1537 + data # Empty line between headers and body. 

1538 + _CRLF 

1539 + boundary_sep 

1540 + _MULTIPART_SEP 

1541 ) 

1542 

1543 return content, multipart_boundary 

1544 

1545 

1546def get_total_bytes(stream): 

1547 """Determine the total number of bytes in a stream. 

1548 

1549 Args: 

1550 stream (IO[bytes]): The stream (i.e. file-like object). 

1551 

1552 Returns: 

1553 int: The number of bytes. 

1554 """ 

1555 current_position = stream.tell() 

1556 # NOTE: ``.seek()`` **should** return the same value that ``.tell()`` 

1557 # returns, but in Python 2, ``file`` objects do not. 

1558 stream.seek(0, os.SEEK_END) 

1559 end_position = stream.tell() 

1560 # Go back to the initial position. 

1561 stream.seek(current_position) 

1562 

1563 return end_position 

1564 

1565 

1566def get_next_chunk(stream, chunk_size, total_bytes): 

1567 """Get a chunk from an I/O stream. 

1568 

1569 The ``stream`` may have fewer bytes remaining than ``chunk_size`` 

1570 so it may not always be the case that 

1571 ``end_byte == start_byte + chunk_size - 1``. 

1572 

1573 Args: 

1574 stream (IO[bytes]): The stream (i.e. file-like object). 

1575 chunk_size (int): The size of the chunk to be read from the ``stream``. 

1576 total_bytes (Optional[int]): The (expected) total number of bytes 

1577 in the ``stream``. 

1578 

1579 Returns: 

1580 Tuple[int, bytes, str]: Triple of: 

1581 

1582 * the start byte index 

1583 * the content in between the start and end bytes (inclusive) 

1584 * content range header for the chunk (slice) that has been read 

1585 

1586 Raises: 

1587 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields 

1588 non-empty content. 

1589 ValueError: If there is no data left to consume. This corresponds 

1590 exactly to the case ``end_byte < start_byte``, which can only 

1591 occur if ``end_byte == start_byte - 1``. 

1592 """ 

1593 start_byte = stream.tell() 

1594 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0: 

1595 payload = stream.read(total_bytes - start_byte) 

1596 else: 

1597 payload = stream.read(chunk_size) 

1598 end_byte = stream.tell() - 1 

1599 

1600 num_bytes_read = len(payload) 

1601 if total_bytes is None: 

1602 if num_bytes_read < chunk_size: 

1603 # We now **KNOW** the total number of bytes. 

1604 total_bytes = end_byte + 1 

1605 elif total_bytes == 0: 

1606 # NOTE: We also expect ``start_byte == 0`` here but don't check 

1607 # because ``_prepare_initiate_request()`` requires the 

1608 # stream to be at the beginning. 

1609 if num_bytes_read != 0: 

1610 raise ValueError( 

1611 "Stream specified as empty, but produced non-empty content." 

1612 ) 

1613 else: 

1614 if num_bytes_read == 0: 

1615 raise ValueError( 

1616 "Stream is already exhausted. There is no content remaining." 

1617 ) 

1618 

1619 content_range = get_content_range(start_byte, end_byte, total_bytes) 

1620 return start_byte, payload, content_range 

1621 

1622 

1623def get_content_range(start_byte, end_byte, total_bytes): 

1624 """Convert start, end and total into content range header. 

1625 

1626 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*". 

1627 If we are dealing with an empty range (i.e. ``end_byte < start_byte``) 

1628 then "bytes */{total}" is used. 

1629 

1630 This function **ASSUMES** that if the size is not known, the caller will 

1631 not also pass an empty range. 

1632 

1633 Args: 

1634 start_byte (int): The start (inclusive) of the byte range. 

1635 end_byte (int): The end (inclusive) of the byte range. 

1636 total_bytes (Optional[int]): The number of bytes in the byte 

1637 range (if known). 

1638 

1639 Returns: 

1640 str: The content range header. 

1641 """ 

1642 if total_bytes is None: 

1643 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte) 

1644 elif end_byte < start_byte: 

1645 return _EMPTY_RANGE_TEMPLATE.format(total_bytes) 

1646 else: 

1647 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)